diff --git a/.gitignore b/.gitignore index b23f66f8..e94d2315 100644 --- a/.gitignore +++ b/.gitignore @@ -21,4 +21,5 @@ docs/_build/ docs/source/api/ #edgetest -.edgetest/ \ No newline at end of file +.edgetest/ +tmp/ diff --git a/CODEOWNERS b/CODEOWNERS index 982c16cb..a109fdd9 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -1 +1 @@ -* @fdosani @elzzhu @ak-gupta +* @fdosani @NikhilJArora @ak-gupta diff --git a/README.rst b/README.md similarity index 58% rename from README.rst rename to README.md index 095c42a5..c80f30ad 100644 --- a/README.rst +++ b/README.md @@ -1,11 +1,11 @@ -.. image:: https://img.shields.io/pypi/dm/datacompy - :target: https://pypi.org/project/datacompy/ -.. image:: https://img.shields.io/badge/code%20style-black-000000.svg - :target: https://github.com/ambv/black +# DataComPy + +![PyPI - Python Version](https://img.shields.io/pypi/pyversions/datacompy) +[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/ambv/black) +[![PyPI version](https://badge.fury.io/py/datacompy.svg)](https://badge.fury.io/py/datacompy) +[![Anaconda-Server Badge](https://anaconda.org/conda-forge/datacompy/badges/version.svg)](https://anaconda.org/conda-forge/datacompy) +![PyPI - Downloads](https://img.shields.io/pypi/dm/datacompy) -========= -DataComPy -========= DataComPy is a package to compare two Pandas DataFrames. Originally started to be something of a replacement for SAS's ``PROC COMPARE`` for Pandas DataFrames @@ -13,15 +13,33 @@ with some more functionality than just ``Pandas.DataFrame.equals(Pandas.DataFram (in that it prints out some stats, and lets you tweak how accurate matches have to be). Then extended to carry that functionality over to Spark Dataframes. -Quick Installation -================== +## Quick Installation + +```shell +pip install datacompy +``` + +or + +```shell +conda install datacompy +``` + +### Installing extras -:: +If you would like to use Spark or any other backends please make sure you install via extras: - pip install datacompy +```shell +pip install datacompy[spark] +pip install datacompy[dask] +pip install datacompy[duckdb] +pip install datacompy[polars] +pip install datacompy[ray] -Pandas Detail -============= +``` + + +## Pandas Detail DataComPy will try to join two dataframes either on a list of join columns, or on indexes. If the two dataframes have duplicates based on join values, the @@ -33,53 +51,54 @@ dataframe and an identically-named column with ``float64`` dtype in another, it will tell you that the dtypes are different but will still try to compare the values. -Basic Usage ------------ - -.. code-block:: python - - from io import StringIO - import pandas as pd - import datacompy - data1 = """acct_id,dollar_amt,name,float_fld,date_fld - 10000001234,123.45,George Maharis,14530.1555,2017-01-01 - 10000001235,0.45,Michael Bluth,1,2017-01-01 - 10000001236,1345,George Bluth,,2017-01-01 - 10000001237,123456,Bob Loblaw,345.12,2017-01-01 - 10000001239,1.05,Lucille Bluth,,2017-01-01 - """ - - data2 = """acct_id,dollar_amt,name,float_fld - 10000001234,123.4,George Michael Bluth,14530.155 - 10000001235,0.45,Michael Bluth, - 10000001236,1345,George Bluth,1 - 10000001237,123456,Robert Loblaw,345.12 - 10000001238,1.05,Loose Seal Bluth,111 - """ - - df1 = pd.read_csv(StringIO(data1)) - df2 = pd.read_csv(StringIO(data2)) - - compare = datacompy.Compare( - df1, - df2, - join_columns='acct_id', #You can also specify a list of columns - abs_tol=0, #Optional, defaults to 0 - rel_tol=0, #Optional, defaults to 0 - df1_name='Original', #Optional, defaults to 'df1' - df2_name='New' #Optional, defaults to 'df2' - ) - compare.matches(ignore_extra_columns=False) - # False - - # This method prints out a human-readable report summarizing and sampling differences - print(compare.report()) +### Basic Usage + +```python + +from io import StringIO +import pandas as pd +import datacompy + +data1 = """acct_id,dollar_amt,name,float_fld,date_fld +10000001234,123.45,George Maharis,14530.1555,2017-01-01 +10000001235,0.45,Michael Bluth,1,2017-01-01 +10000001236,1345,George Bluth,,2017-01-01 +10000001237,123456,Bob Loblaw,345.12,2017-01-01 +10000001239,1.05,Lucille Bluth,,2017-01-01 +""" + +data2 = """acct_id,dollar_amt,name,float_fld +10000001234,123.4,George Michael Bluth,14530.155 +10000001235,0.45,Michael Bluth, +10000001236,1345,George Bluth,1 +10000001237,123456,Robert Loblaw,345.12 +10000001238,1.05,Loose Seal Bluth,111 +""" + +df1 = pd.read_csv(StringIO(data1)) +df2 = pd.read_csv(StringIO(data2)) + +compare = datacompy.Compare( + df1, + df2, + join_columns='acct_id', #You can also specify a list of columns + abs_tol=0, #Optional, defaults to 0 + rel_tol=0, #Optional, defaults to 0 + df1_name='Original', #Optional, defaults to 'df1' + df2_name='New' #Optional, defaults to 'df2' + ) +compare.matches(ignore_extra_columns=False) +# False + +# This method prints out a human-readable report summarizing and sampling differences +print(compare.report()) +``` See docs for more detailed usage instructions and an example of the report output. -Things that are happening behind the scenes -------------------------------------------- + +### Things that are happening behind the scenes - You pass in two dataframes (``df1``, ``df2``) to ``datacompy.Compare`` and a column to join on (or list of columns) to ``join_columns``. By default the @@ -114,16 +133,102 @@ Things that are happening behind the scenes - You can turn on logging to see more detailed logs. -.. _spark-detail: - -Spark Detail -============ - -.. important:: - - With version ``v0.9.0`` SparkCompare now uses Null Safe (``<=>``) comparisons - -.. +## Fugue Detail + +[Fugue](https://github.com/fugue-project/fugue) is a Python library that provides a unified interface +for data processing on Pandas, DuckDB, Polars, Arrow, Spark, Dask, Ray, and many other backends. +DataComPy integrates with Fugue to provide a simple way to compare data across these backends. + +### Basic Usage + +The following usage example compares two Pandas dataframes, it is equivalent to the Pandas example above. + +```python +from io import StringIO +import pandas as pd +import datacompy + +data1 = """acct_id,dollar_amt,name,float_fld,date_fld +10000001234,123.45,George Maharis,14530.1555,2017-01-01 +10000001235,0.45,Michael Bluth,1,2017-01-01 +10000001236,1345,George Bluth,,2017-01-01 +10000001237,123456,Bob Loblaw,345.12,2017-01-01 +10000001239,1.05,Lucille Bluth,,2017-01-01 +""" + +data2 = """acct_id,dollar_amt,name,float_fld +10000001234,123.4,George Michael Bluth,14530.155 +10000001235,0.45,Michael Bluth, +10000001236,1345,George Bluth,1 +10000001237,123456,Robert Loblaw,345.12 +10000001238,1.05,Loose Seal Bluth,111 +""" + +df1 = pd.read_csv(StringIO(data1)) +df2 = pd.read_csv(StringIO(data2)) + +datacompy.is_match( + df1, + df2, + join_columns='acct_id', #You can also specify a list of columns + abs_tol=0, #Optional, defaults to 0 + rel_tol=0, #Optional, defaults to 0 + df1_name='Original', #Optional, defaults to 'df1' + df2_name='New' #Optional, defaults to 'df2' +) +# False + +# This method prints out a human-readable report summarizing and sampling differences +print(datacompy.report( + df1, + df2, + join_columns='acct_id', #You can also specify a list of columns + abs_tol=0, #Optional, defaults to 0 + rel_tol=0, #Optional, defaults to 0 + df1_name='Original', #Optional, defaults to 'df1' + df2_name='New' #Optional, defaults to 'df2' +)) +``` + +In order to compare dataframes of different backends, you just need to replace ``df1`` and ``df2`` with +dataframes of different backends. Just pass in Dataframes such as Pandas dataframes, DuckDB relations, +Polars dataframes, Arrow tables, Spark dataframes, Dask dataframes or Ray datasets. For example, +to compare a Pandas dataframe with a Spark dataframe: + +```python +from pyspark.sql import SparkSession + +spark = SparkSession.builder.getOrCreate() +spark_df2 = spark.createDataFrame(df2) +datacompy.is_match( + df1, + spark_df2, + join_columns='acct_id', +) +``` + +Notice that in order to use a specific backend, you need to have the corresponding library installed. +For example, if you want compare Ray datasets, you must do + +```shell +pip install datacompy[ray] +``` + + +### How it works + +DataComPy uses Fugue to partition the two dataframes into chunks, and then compare each chunk in parallel +using the Pandas-based ``Compare``. The comparison results are then aggregated to produce the final result. +Different from the join operation used in ``SparkCompare``, the Fugue version uses the ``cogroup -> map`` +like semantic (not exactly the same, Fugue adopts a coarse version to achieve great performance), which +guarantees full data comparison with consistent result compared to Pandas-based ``Compare``. + + +## Spark Detail + +:::{important} +With version ``v0.9.0`` SparkCompare now uses Null Safe (``<=>``) comparisons +::: DataComPy's ``SparkCompare`` class will join two dataframes either on a list of join columns. It has the capability to map column names that may be different in each @@ -143,8 +248,8 @@ are that your data is too large to fit into memory, or you're comparing data that works well in a Spark environment, like partitioned Parquet, CSV, or JSON files, or Cerebro tables. -Performance Implications ------------------------- +### Performance Implications + Spark scales incredibly well, so you can use ``SparkCompare`` to compare billions of rows of data, provided you spin up a big enough cluster. Still, @@ -167,10 +272,10 @@ cliched realm of "big data": need to ensure that you have enough free cache memory before you do this, so this parameter is set to False by default. -Basic Usage ------------ -.. code-block:: python +### Basic Usage + +```python import datetime import datacompy @@ -207,9 +312,9 @@ Basic Usage # This prints out a human-readable report summarizing differences comparison.report() +``` -Using SparkCompare on EMR or standalone Spark ---------------------------------------------- +### Using SparkCompare on EMR or standalone Spark 1. Set proxy variables 2. Create a virtual environment, if desired (``virtualenv venv; source venv/bin/activate``) @@ -221,8 +326,7 @@ Using SparkCompare on EMR or standalone Spark (note that your version of py4j may differ depending on the version of Spark you're using) -Using SparkCompare on Databricks --------------------------------- +### Using SparkCompare on Databricks 1. Clone this repository locally 2. Create a datacompy egg by running ``python setup.py bdist_egg`` from the repo root directory. @@ -236,17 +340,16 @@ Using SparkCompare on Databricks you can choose clusters to attach the library to. 6. ``import datacompy`` in a notebook attached to the cluster that the library is attached to and enjoy! -Contributors ------------- + +## Contributors We welcome and appreciate your contributions! Before we can accept any contributions, we ask that you please be sure to -sign the `Contributor License Agreement (CLA) `_. +sign the [Contributor License Agreement (CLA)](https://cla-assistant.io/capitalone/datacompy). -This project adheres to the `Open Source Code of Conduct `_. +This project adheres to the [Open Source Code of Conduct](https://developer.capitalone.com/resources/code-of-conduct/). By participating, you are expected to honor this code. -Roadmap -------- +## Roadmap -Roadmap details can be found `here `_ +Roadmap details can be found [here](https://github.com/capitalone/datacompy/blob/develop/ROADMAP.rst) diff --git a/datacompy/__init__.py b/datacompy/__init__.py index fef944c6..0fe94cf6 100644 --- a/datacompy/__init__.py +++ b/datacompy/__init__.py @@ -1,5 +1,5 @@ # -# Copyright 2020 Capital One Services, LLC +# Copyright 2023 Capital One Services, LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,7 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "0.9.0" +__version__ = "0.10.0" from datacompy.core import * -from datacompy.sparkcompare import NUMERIC_SPARK_TYPES, SparkCompare +from datacompy.fugue import is_match, report +from datacompy.spark import NUMERIC_SPARK_TYPES, SparkCompare diff --git a/datacompy/core.py b/datacompy/core.py index 5b274071..bdd41142 100644 --- a/datacompy/core.py +++ b/datacompy/core.py @@ -91,7 +91,6 @@ def __init__( ignore_case=False, cast_column_names_lower=True, ): - self.cast_column_names_lower = cast_column_names_lower if on_index and join_columns is not None: raise Exception("Only provide on_index or join_columns") @@ -195,9 +194,7 @@ def _compare(self, ignore_spaces, ignore_case): LOG.info("df1 Pandas.DataFrame.equals df2") else: LOG.info("df1 does not Pandas.DataFrame.equals df2") - LOG.info( - f"Number of columns in common: {len(self.intersect_columns())}" - ) + LOG.info(f"Number of columns in common: {len(self.intersect_columns())}") LOG.debug("Checking column overlap") for col in self.df1_unq_columns(): LOG.info(f"Column in df1 and not in df2: {col}") @@ -275,7 +272,6 @@ def _dataframe_merge(self, ignore_spaces): outer_join = self.df1.merge( self.df2, how="outer", suffixes=("_df1", "_df2"), indicator=True, **params ) - # Clean up temp columns for duplicate row matching if self._any_dupes: if self.on_index: @@ -301,12 +297,8 @@ def _dataframe_merge(self, ignore_spaces): df2_cols ].copy() self.df2_unq_rows.columns = self.df2.columns - LOG.info( - f"Number of rows in df1 and not in df2: {len(self.df1_unq_rows)}" - ) - LOG.info( - f"Number of rows in df2 and not in df1: {len(self.df2_unq_rows)}" - ) + LOG.info(f"Number of rows in df1 and not in df2: {len(self.df1_unq_rows)}") + LOG.info(f"Number of rows in df2 and not in df1: {len(self.df2_unq_rows)}") LOG.debug("Selecting intersecting rows") self.intersect_rows = outer_join[outer_join["_merge"] == "both"].copy() @@ -354,9 +346,7 @@ def _intersect_compare(self, ignore_spaces, ignore_case): match_rate = float(match_cnt) / row_cnt else: match_rate = 0 - LOG.info( - f"{column}: {match_cnt} / {row_cnt} ({match_rate:.2%}) match" - ) + LOG.info(f"{column}: {match_cnt} / {row_cnt} ({match_rate:.2%}) match") self.column_stats.append( { @@ -542,6 +532,12 @@ def report(self, sample_count=10, column_count=10, html_file=None): str The report, formatted kinda nicely. """ + + def df_to_str(pdf): + if not self.on_index: + pdf = pdf.reset_index(drop=True) + return pdf.to_string() + # Header report = render("header.txt") df_header = pd.DataFrame( @@ -640,25 +636,33 @@ def report(self, sample_count=10, column_count=10, html_file=None): report += "-------------------------------\n" report += "\n" for sample in match_sample: - report += sample.to_string() + report += df_to_str(sample) report += "\n\n" if min(sample_count, self.df1_unq_rows.shape[0]) > 0: - report += f"Sample Rows Only in {self.df1_name} (First {column_count} Columns)\n" - report += f"---------------------------------------{'-' * len(self.df1_name)}\n" + report += ( + f"Sample Rows Only in {self.df1_name} (First {column_count} Columns)\n" + ) + report += ( + f"---------------------------------------{'-' * len(self.df1_name)}\n" + ) report += "\n" columns = self.df1_unq_rows.columns[:column_count] unq_count = min(sample_count, self.df1_unq_rows.shape[0]) - report += self.df1_unq_rows.sample(unq_count)[columns].to_string() + report += df_to_str(self.df1_unq_rows.sample(unq_count)[columns]) report += "\n\n" if min(sample_count, self.df2_unq_rows.shape[0]) > 0: - report += f"Sample Rows Only in {self.df2_name} (First {column_count} Columns)\n" - report += f"---------------------------------------{'-' * len(self.df2_name)}\n" + report += ( + f"Sample Rows Only in {self.df2_name} (First {column_count} Columns)\n" + ) + report += ( + f"---------------------------------------{'-' * len(self.df2_name)}\n" + ) report += "\n" columns = self.df2_unq_rows.columns[:column_count] unq_count = min(sample_count, self.df2_unq_rows.shape[0]) - report += self.df2_unq_rows.sample(unq_count)[columns].to_string() + report += df_to_str(self.df2_unq_rows.sample(unq_count)[columns]) report += "\n\n" if html_file: diff --git a/datacompy/fugue.py b/datacompy/fugue.py new file mode 100644 index 00000000..74b474a8 --- /dev/null +++ b/datacompy/fugue.py @@ -0,0 +1,633 @@ +# +# Copyright 2023 Capital One Services, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Compare two DataFrames that are supported by Fugue +""" + +import logging +import pickle +from collections import defaultdict +from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union + +import fugue.api as fa +import pandas as pd +import pyarrow as pa +from fugue import AnyDataFrame + +from .core import Compare, render + +LOG = logging.getLogger(__name__) +HASH_COL = "__datacompy__hash__" + + +def is_match( + df1: AnyDataFrame, + df2: AnyDataFrame, + join_columns: Union[str, List[str]], + abs_tol: float = 0, + rel_tol: float = 0, + df1_name: str = "df1", + df2_name: str = "df2", + ignore_spaces: bool = False, + ignore_case: bool = False, + cast_column_names_lower: bool = True, + parallelism: Optional[int] = None, + strict_schema: bool = False, +) -> bool: + """Check whether two dataframes match. + + Both df1 and df2 should be dataframes containing all of the join_columns, + with unique column names. Differences between values are compared to + abs_tol + rel_tol * abs(df2['value']). + + Parameters + ---------- + df1 : ``AnyDataFrame`` + First dataframe to check + df2 : ``AnyDataFrame`` + Second dataframe to check + join_columns : list or str, optional + Column(s) to join dataframes on. If a string is passed in, that one + column will be used. + abs_tol : float, optional + Absolute tolerance between two values. + rel_tol : float, optional + Relative tolerance between two values. + df1_name : str, optional + A string name for the first dataframe. This allows the reporting to + print out an actual name instead of "df1", and allows human users to + more easily track the dataframes. + df2_name : str, optional + A string name for the second dataframe + ignore_spaces : bool, optional + Flag to strip whitespace (including newlines) from string columns (including any join + columns) + ignore_case : bool, optional + Flag to ignore the case of string columns + cast_column_names_lower: bool, optional + Boolean indicator that controls of column names will be cast into lower case + parallelism: int, optional + An integer representing the amount of parallelism. Entering a value for this + will force to use of Fugue over just vanilla Pandas + strict_schema: bool, optional + The schema must match exactly if set to ``True``. This includes the names and types. Allows for a fast fail. + + Returns + ------- + bool + Returns boolean as to if the DataFrames match. + """ + if ( + isinstance(df1, pd.DataFrame) + and isinstance(df2, pd.DataFrame) + and parallelism is None # user did not specify parallelism + and fa.get_current_parallelism() == 1 # currently on a local execution engine + ): + comp = Compare( + df1=df1, + df2=df2, + join_columns=join_columns, + abs_tol=abs_tol, + rel_tol=rel_tol, + df1_name=df1_name, + df2_name=df2_name, + ignore_spaces=ignore_spaces, + ignore_case=ignore_case, + cast_column_names_lower=cast_column_names_lower, + ) + return comp.matches() + + try: + matches = _distributed_compare( + df1=df1, + df2=df2, + join_columns=join_columns, + return_obj_func=lambda comp: comp.matches(), + abs_tol=abs_tol, + rel_tol=rel_tol, + df1_name=df1_name, + df2_name=df2_name, + ignore_spaces=ignore_spaces, + ignore_case=ignore_case, + cast_column_names_lower=cast_column_names_lower, + parallelism=parallelism, + strict_schema=strict_schema, + ) + except _StrictSchemaError: + return False + + return all(matches) + + +def report( + df1: AnyDataFrame, + df2: AnyDataFrame, + join_columns: Union[str, List[str]], + abs_tol: float = 0, + rel_tol: float = 0, + df1_name: str = "df1", + df2_name: str = "df2", + ignore_spaces: bool = False, + ignore_case: bool = False, + cast_column_names_lower: bool = True, + sample_count=10, + column_count=10, + html_file=None, + parallelism: Optional[int] = None, +) -> None: + """Returns a string representation of a report. The representation can + then be printed or saved to a file. + + Both df1 and df2 should be dataframes containing all of the join_columns, + with unique column names. Differences between values are compared to + abs_tol + rel_tol * abs(df2['value']). + + Parameters + ---------- + df1 : ``AnyDataFrame`` + First dataframe to check + df2 : ``AnyDataFrame`` + Second dataframe to check + join_columns : list or str, optional + Column(s) to join dataframes on. If a string is passed in, that one + column will be used. + abs_tol : float, optional + Absolute tolerance between two values. + rel_tol : float, optional + Relative tolerance between two values. + df1_name : str, optional + A string name for the first dataframe. This allows the reporting to + print out an actual name instead of "df1", and allows human users to + more easily track the dataframes. + df2_name : str, optional + A string name for the second dataframe + ignore_spaces : bool, optional + Flag to strip whitespace (including newlines) from string columns (including any join + columns) + ignore_case : bool, optional + Flag to ignore the case of string columns + cast_column_names_lower: bool, optional + Boolean indicator that controls of column names will be cast into lower case + parallelism: int, optional + An integer representing the amount of parallelism. Entering a value for this + will force to use of Fugue over just vanilla Pandas + strict_schema: bool, optional + The schema must match exactly if set to ``True``. This includes the names and types. Allows for a fast fail. + sample_count : int, optional + The number of sample records to return. Defaults to 10. + column_count : int, optional + The number of columns to display in the sample records output. Defaults to 10. + html_file : str, optional + HTML file name to save report output to. If ``None`` the file creation will be skipped. + + + Returns + ------- + str + The report, formatted kinda nicely. + """ + if isinstance(join_columns, str): + join_columns = [join_columns] + + if ( + isinstance(df1, pd.DataFrame) + and isinstance(df2, pd.DataFrame) + and parallelism is None # user did not specify parallelism + and fa.get_current_parallelism() == 1 # currently on a local execution engine + ): + comp = Compare( + df1=df1, + df2=df2, + join_columns=join_columns, + abs_tol=abs_tol, + rel_tol=rel_tol, + df1_name=df1_name, + df2_name=df2_name, + ignore_spaces=ignore_spaces, + ignore_case=ignore_case, + cast_column_names_lower=cast_column_names_lower, + ) + return comp.report( + sample_count=sample_count, column_count=column_count, html_file=html_file + ) + + res = _distributed_compare( + df1=df1, + df2=df2, + join_columns=join_columns, + return_obj_func=lambda c: _get_compare_result( + c, sample_count=sample_count, column_count=column_count + ), + abs_tol=abs_tol, + rel_tol=rel_tol, + df1_name=df1_name, + df2_name=df2_name, + ignore_spaces=ignore_spaces, + ignore_case=ignore_case, + cast_column_names_lower=cast_column_names_lower, + parallelism=parallelism, + strict_schema=False, + ) + + first = res[0] + + def shape0(col: str) -> int: + return sum(x[col][0] for x in res) + + def shape1(col: str) -> int: + return first[col][1] + + def _sum(col: str) -> int: + return sum(x[col] for x in res) + + def _any(col: str) -> int: + return any(x[col] for x in res) + + # Header + rpt = render("header.txt") + df_header = pd.DataFrame( + { + "DataFrame": [df1_name, df2_name], + "Columns": [shape1("df1_shape"), shape1("df2_shape")], + "Rows": [shape0("df1_shape"), shape0("df2_shape")], + } + ) + rpt += df_header[["DataFrame", "Columns", "Rows"]].to_string() + rpt += "\n\n" + + # Column Summary + rpt += render( + "column_summary.txt", + len(first["intersect_columns"]), + len(first["df1_unq_columns"]), + len(first["df2_unq_columns"]), + df1_name, + df2_name, + ) + + # Row Summary + match_on = ", ".join(join_columns) + rpt += render( + "row_summary.txt", + match_on, + abs_tol, + rel_tol, + shape0("intersect_rows_shape"), + shape0("df1_unq_rows_shape"), + shape0("df2_unq_rows_shape"), + shape0("intersect_rows_shape") - _sum("count_matching_rows"), + _sum("count_matching_rows"), + df1_name, + df2_name, + "Yes" if _any("_any_dupes") else "No", + ) + + column_stats, match_sample = _aggregate_stats(res, sample_count=sample_count) + any_mismatch = len(match_sample) > 0 + + # Column Matching + cnt_intersect = shape0("intersect_rows_shape") + rpt += render( + "column_comparison.txt", + len([col for col in column_stats if col["unequal_cnt"] > 0]), + len([col for col in column_stats if col["unequal_cnt"] == 0]), + sum([col["unequal_cnt"] for col in column_stats]), + ) + + match_stats = [] + for column in column_stats: + if not column["all_match"]: + any_mismatch = True + match_stats.append( + { + "Column": column["column"], + f"{df1_name} dtype": column["dtype1"], + f"{df2_name} dtype": column["dtype2"], + "# Unequal": column["unequal_cnt"], + "Max Diff": column["max_diff"], + "# Null Diff": column["null_diff"], + } + ) + + if any_mismatch: + rpt += "Columns with Unequal Values or Types\n" + rpt += "------------------------------------\n" + rpt += "\n" + df_match_stats = pd.DataFrame(match_stats) + df_match_stats.sort_values("Column", inplace=True) + # Have to specify again for sorting + rpt += df_match_stats[ + [ + "Column", + f"{df1_name} dtype", + f"{df2_name} dtype", + "# Unequal", + "Max Diff", + "# Null Diff", + ] + ].to_string() + rpt += "\n\n" + + if sample_count > 0: + rpt += "Sample Rows with Unequal Values\n" + rpt += "-------------------------------\n" + rpt += "\n" + for sample in match_sample: + rpt += sample.to_string() + rpt += "\n\n" + + df1_unq_rows_samples = [ + r["df1_unq_rows_sample"] for r in res if r["df1_unq_rows_sample"] is not None + ] + if len(df1_unq_rows_samples) > 0: + rpt += f"Sample Rows Only in {df1_name} (First {column_count} Columns)\n" + rpt += f"---------------------------------------{'-' * len(df1_name)}\n" + rpt += "\n" + rpt += _sample( + pd.concat(df1_unq_rows_samples), sample_count=sample_count + ).to_string() + rpt += "\n\n" + + df2_unq_rows_samples = [ + r["df2_unq_rows_sample"] for r in res if r["df2_unq_rows_sample"] is not None + ] + if len(df2_unq_rows_samples) > 0: + rpt += f"Sample Rows Only in {df2_name} (First {column_count} Columns)\n" + rpt += f"---------------------------------------{'-' * len(df2_name)}\n" + rpt += "\n" + rpt += _sample( + pd.concat(df2_unq_rows_samples), sample_count=sample_count + ).to_string() + rpt += "\n\n" + + if html_file: + html_report = rpt.replace("\n", "
").replace(" ", " ") + html_report = f"
{html_report}
" + with open(html_file, "w") as f: + f.write(html_report) + + return rpt + + +def _distributed_compare( + df1: AnyDataFrame, + df2: AnyDataFrame, + join_columns: Union[str, List[str]], + return_obj_func: Callable[[Compare], Any], + abs_tol: float = 0, + rel_tol: float = 0, + df1_name: str = "df1", + df2_name: str = "df2", + ignore_spaces: bool = False, + ignore_case: bool = False, + cast_column_names_lower: bool = True, + parallelism: Optional[int] = None, + strict_schema: bool = False, +) -> List[Any]: + """Compare the data distributedly using the core Compare class + + Both df1 and df2 should be dataframes containing all of the join_columns, + with unique column names. Differences between values are compared to + abs_tol + rel_tol * abs(df2['value']). + + Parameters + ---------- + df1 : ``AnyDataFrame`` + First dataframe to check + df2 : ``AnyDataFrame`` + Second dataframe to check + join_columns : list or str + Column(s) to join dataframes on. If a string is passed in, that one + column will be used. + return_obj_func : Callable[[Compare], Any] + A function that takes in a Compare object and returns a picklable value. + It determines what is returned from the distributed compare. + abs_tol : float, optional + Absolute tolerance between two values. + rel_tol : float, optional + Relative tolerance between two values. + df1_name : str, optional + A string name for the first dataframe. This allows the reporting to + print out an actual name instead of "df1", and allows human users to + more easily track the dataframes. + df2_name : str, optional + A string name for the second dataframe + ignore_spaces : bool, optional + Flag to strip whitespace (including newlines) from string columns (including any join + columns) + ignore_case : bool, optional + Flag to ignore the case of string columns + cast_column_names_lower: bool, optional + Boolean indicator that controls of column names will be cast into lower case + parallelism: int, optional + An integer representing the amount of parallelism. Entering a value for this + will force to use of Fugue over just vanilla Pandas + strict_schema: bool, optional + The schema must match exactly if set to ``True``. This includes the names and types. Allows for a fast fail. + + Returns + ------- + List[Any] + Returns the list of objects returned from the return_obj_func + """ + + tdf1 = fa.as_fugue_df(df1) + tdf2 = fa.as_fugue_df(df2) + + if isinstance(join_columns, str): + hash_cols = [join_columns] + else: + hash_cols = join_columns + + if cast_column_names_lower: + tdf1 = tdf1.rename( + {col: col.lower() for col in tdf1.schema.names if col != col.lower()} + ) + tdf2 = tdf2.rename( + {col: col.lower() for col in tdf2.schema.names if col != col.lower()} + ) + hash_cols = [col.lower() for col in hash_cols] + + if strict_schema: + if tdf1.schema != tdf2.schema: + raise _StrictSchemaError() + + # check that hash columns exist + assert hash_cols in tdf1.schema, f"{hash_cols} not found in {tdf1.schema}" + assert hash_cols in tdf2.schema, f"{hash_cols} not found in {tdf2.schema}" + + df1_cols = tdf1.schema.names + df2_cols = tdf2.schema.names + str_cols = set(f.name for f in tdf1.schema.fields if pa.types.is_string(f.type)) + bucket = ( + parallelism if parallelism is not None else fa.get_current_parallelism() * 2 + ) + + def _serialize(dfs: Iterable[pd.DataFrame], left: bool) -> Iterable[Dict[str, Any]]: + for df in dfs: + cols = {} + for name in df.columns: + col = df[name] + if name in str_cols: + if ignore_spaces: + col = col.str.strip() + if ignore_case: + col = col.str.lower() + cols[name] = col + data = pd.DataFrame(cols) + gp = pd.util.hash_pandas_object(df[hash_cols], index=False).mod(bucket) + for k, sub in data.groupby(gp, as_index=False, group_keys=False): + yield {"key": k, "left": left, "data": pickle.dumps(sub)} + + ser = fa.union( + fa.transform( + tdf1, + _serialize, + schema="key:int,left:bool,data:binary", + params=dict(left=True), + ), + fa.transform( + tdf2, + _serialize, + schema="key:int,left:bool,data:binary", + params=dict(left=False), + ), + distinct=False, + ) + + def _comp(df: List[Dict[str, Any]]) -> List[List[Any]]: + df1 = ( + pd.concat([pickle.loads(r["data"]) for r in df if r["left"]]) + .sort_values(df1_cols) + .reset_index(drop=True) + ) + df2 = ( + pd.concat([pickle.loads(r["data"]) for r in df if not r["left"]]) + .sort_values(df2_cols) + .reset_index(drop=True) + ) + comp = Compare( + df1=df1, + df2=df2, + join_columns=join_columns, + abs_tol=abs_tol, + rel_tol=rel_tol, + df1_name=df1_name, + df2_name=df2_name, + cast_column_names_lower=False, + ) + return [[pickle.dumps(return_obj_func(comp))]] + + objs = fa.as_array( + fa.transform( + ser, _comp, schema="obj:binary", partition=dict(by="key", num=bucket) + ) + ) + return [pickle.loads(row[0]) for row in objs] + + +def _get_compare_result( + compare: Compare, sample_count: int, column_count: int +) -> Dict[str, Any]: + mismatch_samples: Dict[str, pd.DataFrame] = {} + for column in compare.column_stats: + if not column["all_match"]: + if column["unequal_cnt"] > 0: + mismatch_samples[column["column"]] = compare.sample_mismatch( + column["column"], sample_count, for_display=True + ) + + df1_unq_rows_sample: Any = None + if min(sample_count, compare.df1_unq_rows.shape[0]) > 0: + columns = compare.df1_unq_rows.columns[:column_count] + unq_count = min(sample_count, compare.df1_unq_rows.shape[0]) + df1_unq_rows_sample = _sample(compare.df1_unq_rows, sample_count=unq_count)[ + columns + ] + + df2_unq_rows_sample: Any = None + if min(sample_count, compare.df2_unq_rows.shape[0]) > 0: + columns = compare.df2_unq_rows.columns[:column_count] + unq_count = min(sample_count, compare.df2_unq_rows.shape[0]) + df2_unq_rows_sample = _sample(compare.df2_unq_rows, sample_count=unq_count)[ + columns + ] + + return { + "match": compare.matches(), + "count_matching_rows": compare.count_matching_rows(), + "intersect_columns": compare.intersect_columns(), + "df1_shape": compare.df1.shape, + "df2_shape": compare.df2.shape, + "intersect_rows_shape": compare.intersect_rows.shape, + "df1_unq_rows_shape": compare.df1_unq_rows.shape, + "df1_unq_columns": compare.df1_unq_columns(), + "df2_unq_rows_shape": compare.df2_unq_rows.shape, + "df2_unq_columns": compare.df2_unq_columns(), + "abs_tol": compare.abs_tol, + "rel_tol": compare.rel_tol, + "df1_name": compare.df1_name, + "df2_name": compare.df2_name, + "column_stats": compare.column_stats, + "mismatch_samples": mismatch_samples, + "df1_unq_rows_sample": df1_unq_rows_sample, + "df2_unq_rows_sample": df2_unq_rows_sample, + "_any_dupes": compare._any_dupes, + } + + +def _aggregate_stats( + compares, sample_count +) -> Tuple[List[Dict[str, Any]], List[pd.DataFrame]]: + samples = defaultdict(list) + stats = [] + for compare in compares: + stats.extend(compare["column_stats"]) + for k, v in compare["mismatch_samples"].items(): + samples[k].append(v) + + df = pd.DataFrame(stats) + df = ( + df.groupby("column", as_index=False, group_keys=True) + .agg( + { + "match_column": "first", + "match_cnt": "sum", + "unequal_cnt": "sum", + "dtype1": "first", + "dtype2": "first", + "all_match": "all", + "max_diff": "max", + "null_diff": "sum", + } + ) + .reset_index(drop=False) + ) + return df.to_dict(orient="records"), [ + _sample(pd.concat(v), sample_count=sample_count) for v in samples.values() + ] + + +def _sample(df: pd.DataFrame, sample_count: int) -> pd.DataFrame: + if len(df) <= sample_count: + return df.reset_index(drop=True) + return df.sample(n=sample_count, random_state=0).reset_index(drop=True) + + +class _StrictSchemaError(Exception): + """Exception raised when strict schema is enabled and the schemas do not match""" + + pass diff --git a/datacompy/sparkcompare.py b/datacompy/spark.py similarity index 100% rename from datacompy/sparkcompare.py rename to datacompy/spark.py diff --git a/docs/source/conf.py b/docs/source/conf.py index c9ccbb9e..00407433 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -14,15 +14,14 @@ import sys sys.path.insert(0, os.path.abspath(".")) -import sphinx_rtd_theme import datacompy # -- Project information ----------------------------------------------------- project = "datacompy" -copyright = "2020, Capital One" -author = "Ian Robertson, Dan Coates, Usman Azhar" +copyright = "2023, Capital One" +author = "Ian Robertson, Dan Coates, Faisal Dosani" # The full version, including alpha/beta/rc tags version = datacompy.__version__ @@ -34,7 +33,12 @@ # Add any Sphinx extension module names here, as strings. They can be # extensions coming with Sphinx (named 'sphinx.ext.*') or your custom # ones. -extensions = ["sphinx.ext.autodoc", "sphinx.ext.napoleon", "sphinx.ext.autosummary"] +extensions = [ + "sphinx.ext.autodoc", + "sphinx.ext.napoleon", + "sphinx.ext.autosummary", + "myst_parser", +] napoleon_use_ivar = True numpydoc_show_class_members = False @@ -56,10 +60,27 @@ # The theme to use for HTML and HTML Help pages. See the documentation for # a list of builtin themes. # -html_theme = "sphinx_rtd_theme" -html_theme_path = [sphinx_rtd_theme.get_html_theme_path()] +html_theme = "furo" # Add any paths that contain custom static files (such as style sheets) here, # relative to this directory. They are copied after the builtin static files, # so a file named "default.css" will overwrite the builtin "default.css". -html_static_path = ["_static"] +# html_static_path = ["_static"] + + +myst_enable_extensions = [ + "dollarmath", + "amsmath", + "deflist", + "fieldlist", + "html_admonition", + "html_image", + "colon_fence", + "smartquotes", + "replacements", + "strikethrough", + "substitution", + "tasklist", + "attrs_inline", + "attrs_block", +] diff --git a/docs/source/fugue_usage.rst b/docs/source/fugue_usage.rst new file mode 100644 index 00000000..1d28c6fc --- /dev/null +++ b/docs/source/fugue_usage.rst @@ -0,0 +1,112 @@ +Fugue Detail +============ + +`Fugue `_ is a Python library that provides a unified interface +for data processing on Pandas, DuckDB, Polars, Arrow, Spark, Dask, Ray, and many other backends. +DataComPy integrates with Fugue to provide a simple way to compare data across these backends. + +Basic Usage +----------- + +The Fugue implementation can be accessed via the: + +- ``datacompy.is_match`` +- and ``datacompy.report`` functions + +Please note this is different than the native Pandas implementation which can be accessed via the ``Compare`` class, +the Fugue implementation is using the ``Compare`` class in the background though. + +The following usage example compares two Pandas dataframes, it is equivalent to the Pandas usage example. + +.. code-block:: python + + from io import StringIO + import pandas as pd + import datacompy + + data1 = """acct_id,dollar_amt,name,float_fld,date_fld + 10000001234,123.45,George Maharis,14530.1555,2017-01-01 + 10000001235,0.45,Michael Bluth,1,2017-01-01 + 10000001236,1345,George Bluth,,2017-01-01 + 10000001237,123456,Bob Loblaw,345.12,2017-01-01 + 10000001239,1.05,Lucille Bluth,,2017-01-01 + """ + + data2 = """acct_id,dollar_amt,name,float_fld + 10000001234,123.4,George Michael Bluth,14530.155 + 10000001235,0.45,Michael Bluth, + 10000001236,1345,George Bluth,1 + 10000001237,123456,Robert Loblaw,345.12 + 10000001238,1.05,Loose Seal Bluth,111 + """ + + df1 = pd.read_csv(StringIO(data1)) + df2 = pd.read_csv(StringIO(data2)) + + datacompy.is_match( + df1, + df2, + join_columns='acct_id', #You can also specify a list of columns + abs_tol=0, #Optional, defaults to 0 + rel_tol=0, #Optional, defaults to 0 + df1_name='Original', #Optional, defaults to 'df1' + df2_name='New' #Optional, defaults to 'df2' + ) + # False + + # This method prints out a human-readable report summarizing and sampling differences + print(datacompy.report( + df1, + df2, + join_columns='acct_id', #You can also specify a list of columns + abs_tol=0, #Optional, defaults to 0 + rel_tol=0, #Optional, defaults to 0 + df1_name='Original', #Optional, defaults to 'df1' + df2_name='New' #Optional, defaults to 'df2' + )) + + +Cross Comparing +--------------- + +In order to compare dataframes of different backends, you just need to replace ``df1`` and ``df2`` with +dataframes of different backends. Just pass in Dataframes such as Pandas dataframes, DuckDB relations, +Polars dataframes, Arrow tables, Spark dataframes, Dask dataframes or Ray datasets. For example, +to compare a Pandas dataframe with a Spark dataframe: + +.. code-block:: python + + from pyspark.sql import SparkSession + + spark = SparkSession.builder.getOrCreate() + spark_df2 = spark.createDataFrame(df2) + datacompy.is_match( + df1, + spark_df2, + join_columns='acct_id', + ) + +Notice that in order to use a specific backend, you need to have the corresponding library installed. +For example, if you want compare Ray datasets, you must do + +:: + + pip install datacompy[ray] + + +How it works +------------ + +DataComPy uses Fugue to partition the two dataframes into chunks, and then compare each chunk in parallel +using the Pandas-based ``Compare``. The comparison results are then aggregated to produce the final result. +Different from the join operation used in ``SparkCompare``, the Fugue version uses the ``cogroup -> map`` +like semantic (not exactly the same, Fugue adopts a coarse version to achieve great performance), which +guarantees full data comparison with consistent result compared to Pandas-based ``Compare``. + + +Future releases +--------------- + +We are hoping to pilot Fugue for the community in future releases (0.10+) and gather feedback. With Fugue we get the +benefits of not having to maintain Framework specific code, and also having cross-framework compatibility. We may in +future depending on feedback deprecate ``SparkCompare`` in favour of just using Fugue to manage non-Pandas use cases. diff --git a/docs/source/index.rst b/docs/source/index.rst index 4c41fec7..bcb00f66 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -1,5 +1,6 @@ -.. include:: ../../README.rst +.. include:: ../../README.md + :parser: myst_parser.sphinx_ Contents ======== @@ -10,6 +11,7 @@ Contents Installation Pandas Usage Spark Usage + Fugue Usage Developer Instructions .. toctree:: diff --git a/pytest.ini b/pytest.ini index 9bb04983..3036b87b 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,4 +1,10 @@ [pytest] spark_options = - spark.sql.catalogImplementation: in-memory - spark.master: local \ No newline at end of file + spark.master: local[*] + spark.sql.catalogImplementation: in-memory + spark.sql.shuffle.partitions: 4 + spark.default.parallelism: 4 + spark.executor.cores: 4 + spark.sql.execution.arrow.pyspark.enabled: true + spark.sql.execution.arrow.enabled: false + spark.sql.adaptive.enabled: false \ No newline at end of file diff --git a/setup.cfg b/setup.cfg index a11cffd2..1a316e7d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -35,16 +35,27 @@ install_requires = pandas<=1.5.3,>=0.25.0 numpy<=1.24.2,>=1.22.0 ordered-set<=4.1.0,>=4.0.2 + fugue<=0.8.4,>=0.8.4 [options.package_data] * = templates/* [options.extras_require] +duckdb = + fugue[duckdb] +polars = + fugue[polars] spark = - pyspark>=2.2.0 + fugue[spark] + cloudpickle +dask = + fugue[dask] +ray = + fugue[ray] docs = sphinx - sphinx_rtd_theme + furo + myst-parser tests = pytest pytest-cov @@ -66,6 +77,7 @@ dev = pytest pytest-cov pytest-spark + fugue[spark,duckdb,polars] pre-commit black isort @@ -96,4 +108,5 @@ upgrade = pandas numpy ordered-set + fugue diff --git a/tests/test_fugue.py b/tests/test_fugue.py new file mode 100644 index 00000000..fe074db3 --- /dev/null +++ b/tests/test_fugue.py @@ -0,0 +1,270 @@ +# +# Copyright 2023 Capital One Services, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Testing out the fugue is_match functionality +""" +from io import StringIO + +import duckdb +import fugue.api as fa +import numpy as np +import pandas as pd +import polars as pl +import pytest +from pytest import raises + +from datacompy import Compare, is_match, report + + +@pytest.fixture +def ref_df(): + np.random.seed(0) + return pd.DataFrame( + dict( + a=np.random.randint(0, 10, 100), + b=np.random.rand(100), + c=np.random.choice(["aaa", "b_c", "csd"], 100), + ) + ) + + +@pytest.fixture +def shuffle_df(ref_df): + return ref_df.sample(frac=1.0) + + +@pytest.fixture +def float_off_df(shuffle_df): + return shuffle_df.assign(b=shuffle_df.b + 0.0001) + + +@pytest.fixture +def upper_case_df(shuffle_df): + return shuffle_df.assign(c=shuffle_df.c.str.upper()) + + +@pytest.fixture +def space_df(shuffle_df): + return shuffle_df.assign(c=shuffle_df.c + " ") + + +@pytest.fixture +def upper_col_df(shuffle_df): + return shuffle_df.rename(columns={"a": "A"}) + + +@pytest.fixture +def simple_diff_df1(): + return pd.DataFrame(dict(aa=[0, 1, 0], bb=[2.1, 3.1, 4.1])) + + +@pytest.fixture +def simple_diff_df2(): + return pd.DataFrame(dict(aa=[1, 0, 1], bb=[3.1, 4.1, 5.1], cc=["a", "b", "c"])) + + +def test_is_match_native( + ref_df, + shuffle_df, + float_off_df, + upper_case_df, + space_df, + upper_col_df, +): + # defaults to Compare class + assert is_match(ref_df, ref_df.copy(), join_columns="a") + assert not is_match(ref_df, shuffle_df, join_columns="a") + # Fugue + assert is_match(ref_df, shuffle_df, join_columns="a", parallelism=2) + + assert not is_match(ref_df, float_off_df, join_columns="a", parallelism=2) + assert not is_match( + ref_df, float_off_df, abs_tol=0.00001, join_columns="a", parallelism=2 + ) + assert is_match( + ref_df, float_off_df, abs_tol=0.001, join_columns="a", parallelism=2 + ) + assert is_match( + ref_df, float_off_df, abs_tol=0.001, join_columns="a", parallelism=2 + ) + + assert not is_match(ref_df, upper_case_df, join_columns="a", parallelism=2) + assert is_match( + ref_df, upper_case_df, join_columns="a", ignore_case=True, parallelism=2 + ) + + assert not is_match(ref_df, space_df, join_columns="a", parallelism=2) + assert is_match( + ref_df, space_df, join_columns="a", ignore_spaces=True, parallelism=2 + ) + + assert is_match(ref_df, upper_col_df, join_columns="a", parallelism=2) + + with raises(AssertionError): + is_match( + ref_df, + upper_col_df, + join_columns="a", + cast_column_names_lower=False, + parallelism=2, + ) + + +def test_is_match_spark( + spark_session, + ref_df, + shuffle_df, + float_off_df, + upper_case_df, + space_df, + upper_col_df, +): + rdf = spark_session.createDataFrame(ref_df) + + assert is_match(rdf, shuffle_df, join_columns="a") + + assert not is_match(rdf, float_off_df, join_columns="a") + assert not is_match(rdf, float_off_df, abs_tol=0.00001, join_columns="a") + assert is_match(rdf, float_off_df, abs_tol=0.001, join_columns="a") + assert is_match(rdf, float_off_df, abs_tol=0.001, join_columns="a") + + assert not is_match(rdf, upper_case_df, join_columns="a") + assert is_match(rdf, upper_case_df, join_columns="a", ignore_case=True) + + assert not is_match(rdf, space_df, join_columns="a") + assert is_match(rdf, space_df, join_columns="a", ignore_spaces=True) + + assert is_match(rdf, upper_col_df, join_columns="a") + + with raises(AssertionError): + is_match(rdf, upper_col_df, join_columns="a", cast_column_names_lower=False) + + assert is_match( + spark_session.sql("SELECT 'a' AS a, 'b' AS b"), + spark_session.sql("SELECT 'a' AS a, 'b' AS b"), + join_columns="a", + ) + + +def test_is_match_polars( + ref_df, + shuffle_df, + float_off_df, + upper_case_df, + space_df, + upper_col_df, +): + rdf = pl.from_pandas(ref_df) + + assert is_match(rdf, shuffle_df, join_columns="a") + + assert not is_match(rdf, float_off_df, join_columns="a") + assert not is_match(rdf, float_off_df, abs_tol=0.00001, join_columns="a") + assert is_match(rdf, float_off_df, abs_tol=0.001, join_columns="a") + assert is_match(rdf, float_off_df, abs_tol=0.001, join_columns="a") + + assert not is_match(rdf, upper_case_df, join_columns="a") + assert is_match(rdf, upper_case_df, join_columns="a", ignore_case=True) + + assert not is_match(rdf, space_df, join_columns="a") + assert is_match(rdf, space_df, join_columns="a", ignore_spaces=True) + + assert is_match(rdf, upper_col_df, join_columns="a") + with raises(AssertionError): + is_match(rdf, upper_col_df, join_columns="a", cast_column_names_lower=False) + + +def test_is_match_duckdb( + ref_df, + shuffle_df, + float_off_df, + upper_case_df, + space_df, + upper_col_df, +): + with duckdb.connect(): + rdf = duckdb.from_df(ref_df) + + assert is_match(rdf, shuffle_df, join_columns="a") + + assert not is_match(rdf, float_off_df, join_columns="a") + assert not is_match(rdf, float_off_df, abs_tol=0.00001, join_columns="a") + assert is_match(rdf, float_off_df, abs_tol=0.001, join_columns="a") + assert is_match(rdf, float_off_df, abs_tol=0.001, join_columns="a") + + assert not is_match(rdf, upper_case_df, join_columns="a") + assert is_match(rdf, upper_case_df, join_columns="a", ignore_case=True) + + assert not is_match(rdf, space_df, join_columns="a") + assert is_match(rdf, space_df, join_columns="a", ignore_spaces=True) + + assert is_match(rdf, upper_col_df, join_columns="a") + with raises(AssertionError): + is_match(rdf, upper_col_df, join_columns="a", cast_column_names_lower=False) + + assert is_match( + duckdb.sql("SELECT 'a' AS a, 'b' AS b"), + duckdb.sql("SELECT 'a' AS a, 'b' AS b"), + join_columns="a", + ) + + +def test_doc_case(): + data1 = """acct_id,dollar_amt,name,float_fld,date_fld + 10000001234,123.45,George Maharis,14530.1555,2017-01-01 + 10000001235,0.45,Michael Bluth,1,2017-01-01 + 10000001236,1345,George Bluth,,2017-01-01 + 10000001237,123456,Bob Loblaw,345.12,2017-01-01 + 10000001239,1.05,Lucille Bluth,,2017-01-01 + """ + + data2 = """acct_id,dollar_amt,name,float_fld + 10000001234,123.4,George Michael Bluth,14530.155 + 10000001235,0.45,Michael Bluth, + 10000001236,1345,George Bluth,1 + 10000001237,123456,Robert Loblaw,345.12 + 10000001238,1.05,Loose Seal Bluth,111 + """ + + df1 = pd.read_csv(StringIO(data1)) + df2 = pd.read_csv(StringIO(data2)) + + assert not is_match( + df1, + df2, + join_columns="acct_id", + abs_tol=0, + rel_tol=0, + df1_name="Original", + df2_name="New", + parallelism=2, + ) + + +def test_report_pandas(simple_diff_df1, simple_diff_df2): + comp = Compare(simple_diff_df1, simple_diff_df2, join_columns=["aa"]) + a = report(simple_diff_df1, simple_diff_df2, ["aa"]) + assert a == comp.report() + a = report(simple_diff_df1, simple_diff_df2, "aa", parallelism=2) + assert a == comp.report() + + +def test_report_spark(spark_session, simple_diff_df1, simple_diff_df2): + df1 = spark_session.createDataFrame(simple_diff_df1) + df2 = spark_session.createDataFrame(simple_diff_df2) + comp = Compare(simple_diff_df1, simple_diff_df2, join_columns="aa") + a = report(df1, df2, ["aa"]) + assert a == comp.report() diff --git a/tests/test_sparkcompare.py b/tests/test_spark.py similarity index 99% rename from tests/test_sparkcompare.py rename to tests/test_spark.py index 4dd76dc9..4a38211f 100644 --- a/tests/test_sparkcompare.py +++ b/tests/test_spark.py @@ -33,7 +33,7 @@ import datacompy from datacompy import SparkCompare -from datacompy.sparkcompare import _is_comparable +from datacompy.spark import _is_comparable # Turn off py4j debug messages for all tests in this module logging.getLogger("py4j").setLevel(logging.INFO)