From 87699aad9f1319541a5458c0fc18bbf4aa32df11 Mon Sep 17 00:00:00 2001 From: JacobDawang Date: Thu, 7 Dec 2023 14:33:11 -0700 Subject: [PATCH 1/8] Test without spark --- .github/workflows/test-package.yml | 42 +- pyproject.toml | 12 +- tests/test_fugue.py | 672 ----------------------------- tests/test_spark.py | 3 + 4 files changed, 48 insertions(+), 681 deletions(-) delete mode 100644 tests/test_fugue.py diff --git a/.github/workflows/test-package.yml b/.github/workflows/test-package.yml index 1e4536b3..71548d32 100644 --- a/.github/workflows/test-package.yml +++ b/.github/workflows/test-package.yml @@ -10,38 +10,64 @@ on: branches: [develop, main] jobs: - build: + test-complete-install: - runs-on: ubuntu-latest + runs-on: ubuntu-latest strategy: fail-fast: false matrix: python-version: [3.8, 3.9, '3.10'] spark-version: [3.0.3, 3.1.3, 3.2.3, 3.3.1, 3.4.0] env: - PYTHON_VERSION: ${{ matrix.python-version }} + PYTHON_VERSION: ${{ matrix.python-version }} SPARK_VERSION: ${{ matrix.spark-version }} steps: - - uses: actions/checkout@v2 - + - uses: actions/checkout@v3 + - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v2 with: python-version: ${{ matrix.python-version }} - + - name: Setup Java JDK uses: actions/setup-java@v3 with: java-version: '8' distribution: 'adopt' - + - name: Install Spark run: | python -m pip install --upgrade pip python -m pip install pytest pytest-spark pypandoc python -m pip install pyspark==${{ matrix.spark-version }} - python -m pip install .[dev,spark] + python -m pip install .[tests,tests-spark,spark] + - name: Test with pytest + run: | + python -m pytest tests/ + + test-bare-install: + + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + python-version: [3.8, 3.9, '3.10'] + env: + PYTHON_VERSION: ${{ matrix.python-version }} + + steps: + - uses: actions/checkout@v3 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + + - name: Install Spark + run: | + python -m pip install --upgrade pip + python -m pip install .[tests] - name: Test with pytest run: | python -m pytest tests/ diff --git a/pyproject.toml b/pyproject.toml index ae11c20c..cf653276 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,6 +39,10 @@ Repository = "https://github.com/capitalone/datacompy.git" "Bug Tracker" = "https://github.com/capitalone/datacompy/issues" "Source Code" = "https://github.com/capitalone/datacompy" +[build-system] +requires = ["setuptools>=64.0.0"] +build-backend = "setuptools.build_meta" + [tool.setuptools] packages = ["datacompy"] zip-safe = false @@ -67,8 +71,13 @@ docs = [ tests = [ "pytest", "pytest-cov", +] + +tests-spark = [ + "pytest", + "pytest-cov", "pytest-spark", - "fugue[polars,duckdb,spark]", + "fugue[spark]", ] qa = [ "pre-commit", @@ -90,6 +99,7 @@ dev = [ "datacompy[spark]", "datacompy[docs]", "datacompy[tests]", + "datacompy[tests-spark]", "datacompy[qa]", "datacompy[build]", ] diff --git a/tests/test_fugue.py b/tests/test_fugue.py deleted file mode 100644 index 47f7008b..00000000 --- a/tests/test_fugue.py +++ /dev/null @@ -1,672 +0,0 @@ -# -# Copyright 2023 Capital One Services, LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Testing out the fugue is_match functionality -""" -from io import StringIO - -import duckdb -import numpy as np -import pandas as pd -import polars as pl -import pytest -from ordered_set import OrderedSet -from pytest import raises - -from datacompy import ( - Compare, - all_columns_match, - all_rows_overlap, - intersect_columns, - is_match, - report, - unq_columns, -) - - -@pytest.fixture -def ref_df(): - np.random.seed(0) - - df1 = pd.DataFrame( - dict( - a=np.random.randint(0, 10, 100), - b=np.random.rand(100), - c=np.random.choice(["aaa", "b_c", "csd"], 100), - ) - ) - df1_copy = df1.copy() - df2 = df1.copy().drop(columns=["c"]) - df3 = df1.copy().drop(columns=["a", "b"]) - df4 = pd.DataFrame( - dict( - a=np.random.randint(1, 12, 100), # shift the join col - b=np.random.rand(100), - c=np.random.choice(["aaa", "b_c", "csd"], 100), - ) - ) - return [df1, df1_copy, df2, df3, df4] - - -@pytest.fixture -def shuffle_df(ref_df): - return ref_df[0].sample(frac=1.0) - - -@pytest.fixture -def float_off_df(shuffle_df): - return shuffle_df.assign(b=shuffle_df.b + 0.0001) - - -@pytest.fixture -def upper_case_df(shuffle_df): - return shuffle_df.assign(c=shuffle_df.c.str.upper()) - - -@pytest.fixture -def space_df(shuffle_df): - return shuffle_df.assign(c=shuffle_df.c + " ") - - -@pytest.fixture -def upper_col_df(shuffle_df): - return shuffle_df.rename(columns={"a": "A"}) - - -@pytest.fixture -def simple_diff_df1(): - return pd.DataFrame(dict(aa=[0, 1, 0], bb=[2.1, 3.1, 4.1])).convert_dtypes() - - -@pytest.fixture -def simple_diff_df2(): - return pd.DataFrame( - dict(aa=[1, 0, 1], bb=[3.1, 4.1, 5.1], cc=["a", "b", "c"]) - ).convert_dtypes() - - -@pytest.fixture -def no_intersection_diff_df1(): - np.random.seed(0) - return pd.DataFrame(dict(x=["a"], y=[0.1])).convert_dtypes() - - -@pytest.fixture -def no_intersection_diff_df2(): - return pd.DataFrame(dict(x=["b"], y=[1.1])).convert_dtypes() - - -@pytest.fixture -def large_diff_df1(): - np.random.seed(0) - data = np.random.randint(0, 7, size=10000) - return pd.DataFrame({"x": data, "y": np.array([9] * 10000)}).convert_dtypes() - - -@pytest.fixture -def large_diff_df2(): - np.random.seed(0) - data = np.random.randint(6, 11, size=10000) - return pd.DataFrame({"x": data, "y": np.array([9] * 10000)}).convert_dtypes() - - -def test_is_match_native( - ref_df, - shuffle_df, - float_off_df, - upper_case_df, - space_df, - upper_col_df, -): - # defaults to Compare class - assert is_match(ref_df[0], ref_df[0].copy(), join_columns="a") - assert not is_match(ref_df[0], shuffle_df, join_columns="a") - # Fugue - assert is_match(ref_df[0], shuffle_df, join_columns="a", parallelism=2) - - assert not is_match(ref_df[0], float_off_df, join_columns="a", parallelism=2) - assert not is_match( - ref_df[0], float_off_df, abs_tol=0.00001, join_columns="a", parallelism=2 - ) - assert is_match( - ref_df[0], float_off_df, abs_tol=0.001, join_columns="a", parallelism=2 - ) - assert is_match( - ref_df[0], float_off_df, abs_tol=0.001, join_columns="a", parallelism=2 - ) - - assert not is_match(ref_df[0], upper_case_df, join_columns="a", parallelism=2) - assert is_match( - ref_df[0], upper_case_df, join_columns="a", ignore_case=True, parallelism=2 - ) - - assert not is_match(ref_df[0], space_df, join_columns="a", parallelism=2) - assert is_match( - ref_df[0], space_df, join_columns="a", ignore_spaces=True, parallelism=2 - ) - - assert is_match(ref_df[0], upper_col_df, join_columns="a", parallelism=2) - - with raises(AssertionError): - is_match( - ref_df[0], - upper_col_df, - join_columns="a", - cast_column_names_lower=False, - parallelism=2, - ) - - -def test_is_match_spark( - spark_session, - ref_df, - shuffle_df, - float_off_df, - upper_case_df, - space_df, - upper_col_df, -): - ref_df[0].iteritems = ref_df[0].items # pandas 2 compatibility - rdf = spark_session.createDataFrame(ref_df[0]) - - assert is_match(rdf, shuffle_df, join_columns="a") - - assert not is_match(rdf, float_off_df, join_columns="a") - assert not is_match(rdf, float_off_df, abs_tol=0.00001, join_columns="a") - assert is_match(rdf, float_off_df, abs_tol=0.001, join_columns="a") - assert is_match(rdf, float_off_df, abs_tol=0.001, join_columns="a") - - assert not is_match(rdf, upper_case_df, join_columns="a") - assert is_match(rdf, upper_case_df, join_columns="a", ignore_case=True) - - assert not is_match(rdf, space_df, join_columns="a") - assert is_match(rdf, space_df, join_columns="a", ignore_spaces=True) - - assert is_match(rdf, upper_col_df, join_columns="a") - - with raises(AssertionError): - is_match(rdf, upper_col_df, join_columns="a", cast_column_names_lower=False) - - assert is_match( - spark_session.sql("SELECT 'a' AS a, 'b' AS b"), - spark_session.sql("SELECT 'a' AS a, 'b' AS b"), - join_columns="a", - ) - - -def test_is_match_polars( - ref_df, - shuffle_df, - float_off_df, - upper_case_df, - space_df, - upper_col_df, -): - rdf = pl.from_pandas(ref_df[0]) - - assert is_match(rdf, shuffle_df, join_columns="a") - - assert not is_match(rdf, float_off_df, join_columns="a") - assert not is_match(rdf, float_off_df, abs_tol=0.00001, join_columns="a") - assert is_match(rdf, float_off_df, abs_tol=0.001, join_columns="a") - assert is_match(rdf, float_off_df, abs_tol=0.001, join_columns="a") - - assert not is_match(rdf, upper_case_df, join_columns="a") - assert is_match(rdf, upper_case_df, join_columns="a", ignore_case=True) - - assert not is_match(rdf, space_df, join_columns="a") - assert is_match(rdf, space_df, join_columns="a", ignore_spaces=True) - - assert is_match(rdf, upper_col_df, join_columns="a") - with raises(AssertionError): - is_match(rdf, upper_col_df, join_columns="a", cast_column_names_lower=False) - - -def test_is_match_duckdb( - ref_df, - shuffle_df, - float_off_df, - upper_case_df, - space_df, - upper_col_df, -): - with duckdb.connect(): - rdf = duckdb.from_df(ref_df[0]) - - assert is_match(rdf, shuffle_df, join_columns="a") - - assert not is_match(rdf, float_off_df, join_columns="a") - assert not is_match(rdf, float_off_df, abs_tol=0.00001, join_columns="a") - assert is_match(rdf, float_off_df, abs_tol=0.001, join_columns="a") - assert is_match(rdf, float_off_df, abs_tol=0.001, join_columns="a") - - assert not is_match(rdf, upper_case_df, join_columns="a") - assert is_match(rdf, upper_case_df, join_columns="a", ignore_case=True) - - assert not is_match(rdf, space_df, join_columns="a") - assert is_match(rdf, space_df, join_columns="a", ignore_spaces=True) - - assert is_match(rdf, upper_col_df, join_columns="a") - with raises(AssertionError): - is_match(rdf, upper_col_df, join_columns="a", cast_column_names_lower=False) - - assert is_match( - duckdb.sql("SELECT 'a' AS a, 'b' AS b"), - duckdb.sql("SELECT 'a' AS a, 'b' AS b"), - join_columns="a", - ) - - -def test_doc_case(): - data1 = """acct_id,dollar_amt,name,float_fld,date_fld - 10000001234,123.45,George Maharis,14530.1555,2017-01-01 - 10000001235,0.45,Michael Bluth,1,2017-01-01 - 10000001236,1345,George Bluth,,2017-01-01 - 10000001237,123456,Bob Loblaw,345.12,2017-01-01 - 10000001239,1.05,Lucille Bluth,,2017-01-01 - """ - - data2 = """acct_id,dollar_amt,name,float_fld - 10000001234,123.4,George Michael Bluth,14530.155 - 10000001235,0.45,Michael Bluth, - 10000001236,1345,George Bluth,1 - 10000001237,123456,Robert Loblaw,345.12 - 10000001238,1.05,Loose Seal Bluth,111 - """ - - df1 = pd.read_csv(StringIO(data1)) - df2 = pd.read_csv(StringIO(data2)) - - assert not is_match( - df1, - df2, - join_columns="acct_id", - abs_tol=0, - rel_tol=0, - df1_name="Original", - df2_name="New", - parallelism=2, - ) - - -def _compare_report(expected, actual, truncate=False): - if truncate: - expected = expected.split("Sample Rows", 1)[0] - actual = actual.split("Sample Rows", 1)[0] - assert expected == actual - - -def test_report_pandas( - simple_diff_df1, - simple_diff_df2, - no_intersection_diff_df1, - no_intersection_diff_df2, - large_diff_df1, - large_diff_df2, -): - comp = Compare(simple_diff_df1, simple_diff_df2, join_columns=["aa"]) - a = report(simple_diff_df1, simple_diff_df2, ["aa"]) - _compare_report(comp.report(), a) - a = report(simple_diff_df1, simple_diff_df2, "aa", parallelism=2) - _compare_report(comp.report(), a) - - comp = Compare( - no_intersection_diff_df1, no_intersection_diff_df2, join_columns=["x"] - ) - a = report(no_intersection_diff_df1, no_intersection_diff_df2, ["x"]) - _compare_report(comp.report(), a) - a = report(no_intersection_diff_df1, no_intersection_diff_df2, "x", parallelism=2) - _compare_report(comp.report(), a) - - # due to https://github.com/capitalone/datacompy/issues/221 - # we can have y as a constant to ensure all the x matches are equal - - comp = Compare(large_diff_df1, large_diff_df2, join_columns=["x"]) - a = report(large_diff_df1, large_diff_df2, ["x"]) - _compare_report(comp.report(), a, truncate=True) - a = report(large_diff_df1, large_diff_df2, "x", parallelism=2) - _compare_report(comp.report(), a, truncate=True) - - -def test_report_spark( - spark_session, - simple_diff_df1, - simple_diff_df2, - no_intersection_diff_df1, - no_intersection_diff_df2, - large_diff_df1, - large_diff_df2, -): - simple_diff_df1.iteritems = simple_diff_df1.items # pandas 2 compatibility - simple_diff_df2.iteritems = simple_diff_df2.items # pandas 2 compatibility - no_intersection_diff_df1.iteritems = ( - no_intersection_diff_df1.items - ) # pandas 2 compatibility - no_intersection_diff_df2.iteritems = ( - no_intersection_diff_df2.items - ) # pandas 2 compatibility - large_diff_df1.iteritems = large_diff_df1.items # pandas 2 compatibility - large_diff_df2.iteritems = large_diff_df2.items # pandas 2 compatibility - - df1 = spark_session.createDataFrame(simple_diff_df1) - df2 = spark_session.createDataFrame(simple_diff_df2) - comp = Compare(simple_diff_df1, simple_diff_df2, join_columns="aa") - a = report(df1, df2, ["aa"]) - _compare_report(comp.report(), a) - - df1 = spark_session.createDataFrame(no_intersection_diff_df1) - df2 = spark_session.createDataFrame(no_intersection_diff_df2) - comp = Compare(no_intersection_diff_df1, no_intersection_diff_df2, join_columns="x") - a = report(df1, df2, ["x"]) - _compare_report(comp.report(), a) - - # due to https://github.com/capitalone/datacompy/issues/221 - # we can have y as a constant to ensure all the x matches are equal - - df1 = spark_session.createDataFrame(large_diff_df1) - df2 = spark_session.createDataFrame(large_diff_df2) - comp = Compare(large_diff_df1, large_diff_df2, join_columns="x") - a = report(df1, df2, ["x"]) - _compare_report(comp.report(), a, truncate=True) - - -def test_unique_columns_native(ref_df): - df1 = ref_df[0] - df1_copy = ref_df[1] - df2 = ref_df[2] - df3 = ref_df[3] - - assert unq_columns(df1, df1.copy()) == OrderedSet() - assert unq_columns(df1, df2) == OrderedSet(["c"]) - assert unq_columns(df1, df3) == OrderedSet(["a", "b"]) - assert unq_columns(df1.copy(), df1) == OrderedSet() - assert unq_columns(df3, df2) == OrderedSet(["c"]) - - -def test_unique_columns_spark(spark_session, ref_df): - df1 = ref_df[0] - df1_copy = ref_df[1] - df2 = ref_df[2] - df3 = ref_df[3] - - df1.iteritems = df1.items # pandas 2 compatibility - df1_copy.iteritems = df1_copy.items # pandas 2 compatibility - df2.iteritems = df2.items # pandas 2 compatibility - df3.iteritems = df3.items # pandas 2 compatibility - - sdf1 = spark_session.createDataFrame(df1) - sdf1_copy = spark_session.createDataFrame(df1_copy) - sdf2 = spark_session.createDataFrame(df2) - sdf3 = spark_session.createDataFrame(df3) - - assert unq_columns(sdf1, sdf1_copy) == OrderedSet() - assert unq_columns(sdf1, sdf2) == OrderedSet(["c"]) - assert unq_columns(sdf1, sdf3) == OrderedSet(["a", "b"]) - assert unq_columns(sdf1_copy, sdf1) == OrderedSet() - assert unq_columns(sdf3, sdf2) == OrderedSet(["c"]) - - -def test_unique_columns_polars(ref_df): - df1 = ref_df[0] - df1_copy = ref_df[1] - df2 = ref_df[2] - df3 = ref_df[3] - - pdf1 = pl.from_pandas(df1) - pdf1_copy = pl.from_pandas(df1_copy) - pdf2 = pl.from_pandas(df2) - pdf3 = pl.from_pandas(df3) - - assert unq_columns(pdf1, pdf1_copy) == OrderedSet() - assert unq_columns(pdf1, pdf2) == OrderedSet(["c"]) - assert unq_columns(pdf1, pdf3) == OrderedSet(["a", "b"]) - assert unq_columns(pdf1_copy, pdf1) == OrderedSet() - assert unq_columns(pdf3, pdf2) == OrderedSet(["c"]) - - -def test_unique_columns_duckdb(ref_df): - df1 = ref_df[0] - df1_copy = ref_df[1] - df2 = ref_df[2] - df3 = ref_df[3] - - with duckdb.connect(): - ddf1 = duckdb.from_df(df1) - ddf1_copy = duckdb.from_df(df1_copy) - ddf2 = duckdb.from_df(df2) - ddf3 = duckdb.from_df(df3) - - assert unq_columns(ddf1, ddf1_copy) == OrderedSet() - assert unq_columns(ddf1, ddf2) == OrderedSet(["c"]) - assert unq_columns(ddf1, ddf3) == OrderedSet(["a", "b"]) - assert unq_columns(ddf1_copy, ddf1) == OrderedSet() - assert unq_columns(ddf3, ddf2) == OrderedSet(["c"]) - - -def test_intersect_columns_native(ref_df): - df1 = ref_df[0] - df1_copy = ref_df[1] - df2 = ref_df[2] - df3 = ref_df[3] - - assert intersect_columns(df1, df1_copy) == OrderedSet(["a", "b", "c"]) - assert intersect_columns(df1, df2) == OrderedSet(["a", "b"]) - assert intersect_columns(df1, df3) == OrderedSet(["c"]) - assert intersect_columns(df1_copy, df1) == OrderedSet(["a", "b", "c"]) - assert intersect_columns(df3, df2) == OrderedSet() - - -def test_intersect_columns_spark(spark_session, ref_df): - df1 = ref_df[0] - df1_copy = ref_df[1] - df2 = ref_df[2] - df3 = ref_df[3] - - df1.iteritems = df1.items # pandas 2 compatibility - df1_copy.iteritems = df1_copy.items # pandas 2 compatibility - df2.iteritems = df2.items # pandas 2 compatibility - df3.iteritems = df3.items # pandas 2 compatibility - - sdf1 = spark_session.createDataFrame(df1) - sdf1_copy = spark_session.createDataFrame(df1_copy) - sdf2 = spark_session.createDataFrame(df2) - sdf3 = spark_session.createDataFrame(df3) - - assert intersect_columns(sdf1, sdf1_copy) == OrderedSet(["a", "b", "c"]) - assert intersect_columns(sdf1, sdf2) == OrderedSet(["a", "b"]) - assert intersect_columns(sdf1, sdf3) == OrderedSet(["c"]) - assert intersect_columns(sdf1_copy, sdf1) == OrderedSet(["a", "b", "c"]) - assert intersect_columns(sdf3, sdf2) == OrderedSet() - - -def test_intersect_columns_polars(ref_df): - df1 = ref_df[0] - df1_copy = ref_df[1] - df2 = ref_df[2] - df3 = ref_df[3] - - pdf1 = pl.from_pandas(df1) - pdf1_copy = pl.from_pandas(df1_copy) - pdf2 = pl.from_pandas(df2) - pdf3 = pl.from_pandas(df3) - - assert intersect_columns(pdf1, pdf1_copy) == OrderedSet(["a", "b", "c"]) - assert intersect_columns(pdf1, pdf2) == OrderedSet(["a", "b"]) - assert intersect_columns(pdf1, pdf3) == OrderedSet(["c"]) - assert intersect_columns(pdf1_copy, pdf1) == OrderedSet(["a", "b", "c"]) - assert intersect_columns(pdf3, pdf2) == OrderedSet() - - -def test_intersect_columns_duckdb(ref_df): - df1 = ref_df[0] - df1_copy = ref_df[1] - df2 = ref_df[2] - df3 = ref_df[3] - - with duckdb.connect(): - ddf1 = duckdb.from_df(df1) - ddf1_copy = duckdb.from_df(df1_copy) - ddf2 = duckdb.from_df(df2) - ddf3 = duckdb.from_df(df3) - - assert intersect_columns(ddf1, ddf1_copy) == OrderedSet(["a", "b", "c"]) - assert intersect_columns(ddf1, ddf2) == OrderedSet(["a", "b"]) - assert intersect_columns(ddf1, ddf3) == OrderedSet(["c"]) - assert intersect_columns(ddf1_copy, ddf1) == OrderedSet(["a", "b", "c"]) - assert intersect_columns(ddf3, ddf2) == OrderedSet() - - -def test_all_columns_match_native(ref_df): - df1 = ref_df[0] - df1_copy = ref_df[1] - df2 = ref_df[2] - df3 = ref_df[3] - - assert all_columns_match(df1, df1_copy) is True - assert all_columns_match(df1, df2) is False - assert all_columns_match(df1, df3) is False - assert all_columns_match(df1_copy, df1) is True - assert all_columns_match(df3, df2) is False - - -def test_all_columns_match_spark(spark_session, ref_df): - df1 = ref_df[0] - df1_copy = ref_df[1] - df2 = ref_df[2] - df3 = ref_df[3] - - df1.iteritems = df1.items # pandas 2 compatibility - df1_copy.iteritems = df1_copy.items # pandas 2 compatibility - df2.iteritems = df2.items # pandas 2 compatibility - df3.iteritems = df3.items # pandas 2 compatibility - - df1 = spark_session.createDataFrame(df1) - df1_copy = spark_session.createDataFrame(df1_copy) - df2 = spark_session.createDataFrame(df2) - df3 = spark_session.createDataFrame(df3) - - assert all_columns_match(df1, df1_copy) is True - assert all_columns_match(df1, df2) is False - assert all_columns_match(df1, df3) is False - assert all_columns_match(df1_copy, df1) is True - assert all_columns_match(df3, df2) is False - - -def test_all_columns_match_polars(ref_df): - df1 = ref_df[0] - df1_copy = ref_df[1] - df2 = ref_df[2] - df3 = ref_df[3] - - df1 = pl.from_pandas(df1) - df1_copy = pl.from_pandas(df1_copy) - df2 = pl.from_pandas(df2) - df3 = pl.from_pandas(df3) - - assert all_columns_match(df1, df1_copy) is True - assert all_columns_match(df1, df2) is False - assert all_columns_match(df1, df3) is False - assert all_columns_match(df1_copy, df1) is True - assert all_columns_match(df3, df2) is False - - -def test_all_columns_match_duckdb(ref_df): - df1 = ref_df[0] - df1_copy = ref_df[1] - df2 = ref_df[2] - df3 = ref_df[3] - - with duckdb.connect(): - df1 = duckdb.from_df(df1) - df1_copy = duckdb.from_df(df1_copy) - df2 = duckdb.from_df(df2) - df3 = duckdb.from_df(df3) - - assert all_columns_match(df1, df1_copy) is True - assert all_columns_match(df1, df2) is False - assert all_columns_match(df1, df3) is False - assert all_columns_match(df1_copy, df1) is True - assert all_columns_match(df3, df2) is False - - -def test_all_rows_overlap_native( - ref_df, - shuffle_df, -): - # defaults to Compare class - assert all_rows_overlap(ref_df[0], ref_df[0].copy(), join_columns="a") - assert all_rows_overlap(ref_df[0], shuffle_df, join_columns="a") - assert not all_rows_overlap(ref_df[0], ref_df[4], join_columns="a") - # Fugue - assert all_rows_overlap(ref_df[0], shuffle_df, join_columns="a", parallelism=2) - assert not all_rows_overlap(ref_df[0], ref_df[4], join_columns="a", parallelism=2) - - -def test_all_rows_overlap_spark( - spark_session, - ref_df, - shuffle_df, -): - ref_df[0].iteritems = ref_df[0].items # pandas 2 compatibility - ref_df[4].iteritems = ref_df[4].items # pandas 2 compatibility - shuffle_df.iteritems = shuffle_df.items # pandas 2 compatibility - rdf = spark_session.createDataFrame(ref_df[0]) - rdf_copy = spark_session.createDataFrame(ref_df[0]) - rdf4 = spark_session.createDataFrame(ref_df[4]) - sdf = spark_session.createDataFrame(shuffle_df) - - assert all_rows_overlap(rdf, rdf_copy, join_columns="a") - assert all_rows_overlap(rdf, sdf, join_columns="a") - assert not all_rows_overlap(rdf, rdf4, join_columns="a") - assert all_rows_overlap( - spark_session.sql("SELECT 'a' AS a, 'b' AS b"), - spark_session.sql("SELECT 'a' AS a, 'b' AS b"), - join_columns="a", - ) - - -def test_all_rows_overlap_polars( - ref_df, - shuffle_df, -): - rdf = pl.from_pandas(ref_df[0]) - rdf_copy = pl.from_pandas(ref_df[0].copy()) - rdf4 = pl.from_pandas(ref_df[4]) - sdf = pl.from_pandas(shuffle_df) - - assert all_rows_overlap(rdf, rdf_copy, join_columns="a") - assert all_rows_overlap(rdf, sdf, join_columns="a") - assert not all_rows_overlap(rdf, rdf4, join_columns="a") - - -def test_all_rows_overlap_duckdb( - ref_df, - shuffle_df, -): - with duckdb.connect(): - rdf = duckdb.from_df(ref_df[0]) - rdf_copy = duckdb.from_df(ref_df[0].copy()) - rdf4 = duckdb.from_df(ref_df[4]) - sdf = duckdb.from_df(shuffle_df) - - assert all_rows_overlap(rdf, rdf_copy, join_columns="a") - assert all_rows_overlap(rdf, sdf, join_columns="a") - assert not all_rows_overlap(rdf, rdf4, join_columns="a") - assert all_rows_overlap( - duckdb.sql("SELECT 'a' AS a, 'b' AS b"), - duckdb.sql("SELECT 'a' AS a, 'b' AS b"), - join_columns="a", - ) diff --git a/tests/test_spark.py b/tests/test_spark.py index c577a667..2babfb7f 100644 --- a/tests/test_spark.py +++ b/tests/test_spark.py @@ -20,6 +20,9 @@ from decimal import Decimal import pytest + +pytest.importorskip("pyspark") + from pyspark.sql import Row, SparkSession from pyspark.sql.types import ( DateType, From d07d1259669e3cc445e96e44291d6a0d2ed477c1 Mon Sep 17 00:00:00 2001 From: JacobDawang Date: Thu, 7 Dec 2023 14:35:25 -0700 Subject: [PATCH 2/8] Bump setup python version --- .github/workflows/test-package.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test-package.yml b/.github/workflows/test-package.yml index 71548d32..64460aba 100644 --- a/.github/workflows/test-package.yml +++ b/.github/workflows/test-package.yml @@ -26,7 +26,7 @@ jobs: - uses: actions/checkout@v3 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v2 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} @@ -60,7 +60,7 @@ jobs: - uses: actions/checkout@v3 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v2 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} From 55cce90c620f9eac92ac3c69414ebb0b66805f12 Mon Sep 17 00:00:00 2001 From: JacobDawang Date: Thu, 7 Dec 2023 14:36:44 -0700 Subject: [PATCH 3/8] Separate tests out --- tests/test_fugue/conftest.py | 88 +++++++++++ tests/test_fugue/test_duckdb.py | 125 ++++++++++++++++ tests/test_fugue/test_fuge_helpers.py | 5 + tests/test_fugue/test_fugue_pandas.py | 206 ++++++++++++++++++++++++++ tests/test_fugue/test_fugue_polars.py | 108 ++++++++++++++ tests/test_fugue/test_fugue_spark.py | 186 +++++++++++++++++++++++ 6 files changed, 718 insertions(+) create mode 100644 tests/test_fugue/conftest.py create mode 100644 tests/test_fugue/test_duckdb.py create mode 100644 tests/test_fugue/test_fuge_helpers.py create mode 100644 tests/test_fugue/test_fugue_pandas.py create mode 100644 tests/test_fugue/test_fugue_polars.py create mode 100644 tests/test_fugue/test_fugue_spark.py diff --git a/tests/test_fugue/conftest.py b/tests/test_fugue/conftest.py new file mode 100644 index 00000000..1176f1c1 --- /dev/null +++ b/tests/test_fugue/conftest.py @@ -0,0 +1,88 @@ +import pytest +import numpy as np +import pandas as pd + +@pytest.fixture +def ref_df(): + np.random.seed(0) + + df1 = pd.DataFrame( + dict( + a=np.random.randint(0, 10, 100), + b=np.random.rand(100), + c=np.random.choice(["aaa", "b_c", "csd"], 100), + ) + ) + df1_copy = df1.copy() + df2 = df1.copy().drop(columns=["c"]) + df3 = df1.copy().drop(columns=["a", "b"]) + df4 = pd.DataFrame( + dict( + a=np.random.randint(1, 12, 100), # shift the join col + b=np.random.rand(100), + c=np.random.choice(["aaa", "b_c", "csd"], 100), + ) + ) + return [df1, df1_copy, df2, df3, df4] + + +@pytest.fixture +def shuffle_df(ref_df): + return ref_df[0].sample(frac=1.0) + + +@pytest.fixture +def float_off_df(shuffle_df): + return shuffle_df.assign(b=shuffle_df.b + 0.0001) + + +@pytest.fixture +def upper_case_df(shuffle_df): + return shuffle_df.assign(c=shuffle_df.c.str.upper()) + + +@pytest.fixture +def space_df(shuffle_df): + return shuffle_df.assign(c=shuffle_df.c + " ") + + +@pytest.fixture +def upper_col_df(shuffle_df): + return shuffle_df.rename(columns={"a": "A"}) + + +@pytest.fixture +def simple_diff_df1(): + return pd.DataFrame(dict(aa=[0, 1, 0], bb=[2.1, 3.1, 4.1])).convert_dtypes() + + +@pytest.fixture +def simple_diff_df2(): + return pd.DataFrame( + dict(aa=[1, 0, 1], bb=[3.1, 4.1, 5.1], cc=["a", "b", "c"]) + ).convert_dtypes() + + +@pytest.fixture +def no_intersection_diff_df1(): + np.random.seed(0) + return pd.DataFrame(dict(x=["a"], y=[0.1])).convert_dtypes() + + +@pytest.fixture +def no_intersection_diff_df2(): + return pd.DataFrame(dict(x=["b"], y=[1.1])).convert_dtypes() + + +@pytest.fixture +def large_diff_df1(): + np.random.seed(0) + data = np.random.randint(0, 7, size=10000) + return pd.DataFrame({"x": data, "y": np.array([9] * 10000)}).convert_dtypes() + + +@pytest.fixture +def large_diff_df2(): + np.random.seed(0) + data = np.random.randint(6, 11, size=10000) + return pd.DataFrame({"x": data, "y": np.array([9] * 10000)}).convert_dtypes() diff --git a/tests/test_fugue/test_duckdb.py b/tests/test_fugue/test_duckdb.py new file mode 100644 index 00000000..232bc828 --- /dev/null +++ b/tests/test_fugue/test_duckdb.py @@ -0,0 +1,125 @@ +import pytest +from ordered_set import OrderedSet +from pytest import raises + +from datacompy import ( + all_columns_match, + all_rows_overlap, + intersect_columns, + is_match, + unq_columns, +) + +duckdb = pytest.importorskip("duckdb") + + +def test_is_match_duckdb( + ref_df, + shuffle_df, + float_off_df, + upper_case_df, + space_df, + upper_col_df, +): + with duckdb.connect(): + rdf = duckdb.from_df(ref_df[0]) + + assert is_match(rdf, shuffle_df, join_columns="a") + + assert not is_match(rdf, float_off_df, join_columns="a") + assert not is_match(rdf, float_off_df, abs_tol=0.00001, join_columns="a") + assert is_match(rdf, float_off_df, abs_tol=0.001, join_columns="a") + assert is_match(rdf, float_off_df, abs_tol=0.001, join_columns="a") + + assert not is_match(rdf, upper_case_df, join_columns="a") + assert is_match(rdf, upper_case_df, join_columns="a", ignore_case=True) + + assert not is_match(rdf, space_df, join_columns="a") + assert is_match(rdf, space_df, join_columns="a", ignore_spaces=True) + + assert is_match(rdf, upper_col_df, join_columns="a") + with raises(AssertionError): + is_match(rdf, upper_col_df, join_columns="a", cast_column_names_lower=False) + + assert is_match( + duckdb.sql("SELECT 'a' AS a, 'b' AS b"), + duckdb.sql("SELECT 'a' AS a, 'b' AS b"), + join_columns="a", + ) + + +def test_unique_columns_duckdb(ref_df): + df1 = ref_df[0] + df1_copy = ref_df[1] + df2 = ref_df[2] + df3 = ref_df[3] + + with duckdb.connect(): + ddf1 = duckdb.from_df(df1) + ddf1_copy = duckdb.from_df(df1_copy) + ddf2 = duckdb.from_df(df2) + ddf3 = duckdb.from_df(df3) + + assert unq_columns(ddf1, ddf1_copy) == OrderedSet() + assert unq_columns(ddf1, ddf2) == OrderedSet(["c"]) + assert unq_columns(ddf1, ddf3) == OrderedSet(["a", "b"]) + assert unq_columns(ddf1_copy, ddf1) == OrderedSet() + assert unq_columns(ddf3, ddf2) == OrderedSet(["c"]) + + +def test_intersect_columns_duckdb(ref_df): + df1 = ref_df[0] + df1_copy = ref_df[1] + df2 = ref_df[2] + df3 = ref_df[3] + + with duckdb.connect(): + ddf1 = duckdb.from_df(df1) + ddf1_copy = duckdb.from_df(df1_copy) + ddf2 = duckdb.from_df(df2) + ddf3 = duckdb.from_df(df3) + + assert intersect_columns(ddf1, ddf1_copy) == OrderedSet(["a", "b", "c"]) + assert intersect_columns(ddf1, ddf2) == OrderedSet(["a", "b"]) + assert intersect_columns(ddf1, ddf3) == OrderedSet(["c"]) + assert intersect_columns(ddf1_copy, ddf1) == OrderedSet(["a", "b", "c"]) + assert intersect_columns(ddf3, ddf2) == OrderedSet() + + +def test_all_columns_match_duckdb(ref_df): + df1 = ref_df[0] + df1_copy = ref_df[1] + df2 = ref_df[2] + df3 = ref_df[3] + + with duckdb.connect(): + df1 = duckdb.from_df(df1) + df1_copy = duckdb.from_df(df1_copy) + df2 = duckdb.from_df(df2) + df3 = duckdb.from_df(df3) + + assert all_columns_match(df1, df1_copy) is True + assert all_columns_match(df1, df2) is False + assert all_columns_match(df1, df3) is False + assert all_columns_match(df1_copy, df1) is True + assert all_columns_match(df3, df2) is False + + +def test_all_rows_overlap_duckdb( + ref_df, + shuffle_df, +): + with duckdb.connect(): + rdf = duckdb.from_df(ref_df[0]) + rdf_copy = duckdb.from_df(ref_df[0].copy()) + rdf4 = duckdb.from_df(ref_df[4]) + sdf = duckdb.from_df(shuffle_df) + + assert all_rows_overlap(rdf, rdf_copy, join_columns="a") + assert all_rows_overlap(rdf, sdf, join_columns="a") + assert not all_rows_overlap(rdf, rdf4, join_columns="a") + assert all_rows_overlap( + duckdb.sql("SELECT 'a' AS a, 'b' AS b"), + duckdb.sql("SELECT 'a' AS a, 'b' AS b"), + join_columns="a", + ) diff --git a/tests/test_fugue/test_fuge_helpers.py b/tests/test_fugue/test_fuge_helpers.py new file mode 100644 index 00000000..67d8aafd --- /dev/null +++ b/tests/test_fugue/test_fuge_helpers.py @@ -0,0 +1,5 @@ +def _compare_report(expected, actual, truncate=False): + if truncate: + expected = expected.split("Sample Rows", 1)[0] + actual = actual.split("Sample Rows", 1)[0] + assert expected == actual diff --git a/tests/test_fugue/test_fugue_pandas.py b/tests/test_fugue/test_fugue_pandas.py new file mode 100644 index 00000000..cdf22e2b --- /dev/null +++ b/tests/test_fugue/test_fugue_pandas.py @@ -0,0 +1,206 @@ +# +# Copyright 2023 Capital One Services, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Testing out the fugue is_match functionality +""" +from io import StringIO +import pandas as pd +from ordered_set import OrderedSet +from pytest import raises + +from datacompy import ( + Compare, + all_columns_match, + all_rows_overlap, + intersect_columns, + is_match, + report, + unq_columns, +) + +from test_fuge_helpers import _compare_report + + +def test_is_match_native( + ref_df, + shuffle_df, + float_off_df, + upper_case_df, + space_df, + upper_col_df, +): + # defaults to Compare class + assert is_match(ref_df[0], ref_df[0].copy(), join_columns="a") + assert not is_match(ref_df[0], shuffle_df, join_columns="a") + # Fugue + assert is_match(ref_df[0], shuffle_df, join_columns="a", parallelism=2) + + assert not is_match(ref_df[0], float_off_df, join_columns="a", parallelism=2) + assert not is_match( + ref_df[0], float_off_df, abs_tol=0.00001, join_columns="a", parallelism=2 + ) + assert is_match( + ref_df[0], float_off_df, abs_tol=0.001, join_columns="a", parallelism=2 + ) + assert is_match( + ref_df[0], float_off_df, abs_tol=0.001, join_columns="a", parallelism=2 + ) + + assert not is_match(ref_df[0], upper_case_df, join_columns="a", parallelism=2) + assert is_match( + ref_df[0], upper_case_df, join_columns="a", ignore_case=True, parallelism=2 + ) + + assert not is_match(ref_df[0], space_df, join_columns="a", parallelism=2) + assert is_match( + ref_df[0], space_df, join_columns="a", ignore_spaces=True, parallelism=2 + ) + + assert is_match(ref_df[0], upper_col_df, join_columns="a", parallelism=2) + + with raises(AssertionError): + is_match( + ref_df[0], + upper_col_df, + join_columns="a", + cast_column_names_lower=False, + parallelism=2, + ) + + + + + +def test_doc_case(): + data1 = """acct_id,dollar_amt,name,float_fld,date_fld + 10000001234,123.45,George Maharis,14530.1555,2017-01-01 + 10000001235,0.45,Michael Bluth,1,2017-01-01 + 10000001236,1345,George Bluth,,2017-01-01 + 10000001237,123456,Bob Loblaw,345.12,2017-01-01 + 10000001239,1.05,Lucille Bluth,,2017-01-01 + """ + + data2 = """acct_id,dollar_amt,name,float_fld + 10000001234,123.4,George Michael Bluth,14530.155 + 10000001235,0.45,Michael Bluth, + 10000001236,1345,George Bluth,1 + 10000001237,123456,Robert Loblaw,345.12 + 10000001238,1.05,Loose Seal Bluth,111 + """ + + df1 = pd.read_csv(StringIO(data1)) + df2 = pd.read_csv(StringIO(data2)) + + assert not is_match( + df1, + df2, + join_columns="acct_id", + abs_tol=0, + rel_tol=0, + df1_name="Original", + df2_name="New", + parallelism=2, + ) + +def test_report_pandas( + simple_diff_df1, + simple_diff_df2, + no_intersection_diff_df1, + no_intersection_diff_df2, + large_diff_df1, + large_diff_df2, +): + comp = Compare(simple_diff_df1, simple_diff_df2, join_columns=["aa"]) + a = report(simple_diff_df1, simple_diff_df2, ["aa"]) + _compare_report(comp.report(), a) + a = report(simple_diff_df1, simple_diff_df2, "aa", parallelism=2) + _compare_report(comp.report(), a) + + comp = Compare( + no_intersection_diff_df1, no_intersection_diff_df2, join_columns=["x"] + ) + a = report(no_intersection_diff_df1, no_intersection_diff_df2, ["x"]) + _compare_report(comp.report(), a) + a = report(no_intersection_diff_df1, no_intersection_diff_df2, "x", parallelism=2) + _compare_report(comp.report(), a) + + # due to https://github.com/capitalone/datacompy/issues/221 + # we can have y as a constant to ensure all the x matches are equal + + comp = Compare(large_diff_df1, large_diff_df2, join_columns=["x"]) + a = report(large_diff_df1, large_diff_df2, ["x"]) + _compare_report(comp.report(), a, truncate=True) + a = report(large_diff_df1, large_diff_df2, "x", parallelism=2) + _compare_report(comp.report(), a, truncate=True) + +def test_unique_columns_native(ref_df): + df1 = ref_df[0] + df1_copy = ref_df[1] + df2 = ref_df[2] + df3 = ref_df[3] + + assert unq_columns(df1, df1.copy()) == OrderedSet() + assert unq_columns(df1, df2) == OrderedSet(["c"]) + assert unq_columns(df1, df3) == OrderedSet(["a", "b"]) + assert unq_columns(df1.copy(), df1) == OrderedSet() + assert unq_columns(df3, df2) == OrderedSet(["c"]) + + + + + +def test_intersect_columns_native(ref_df): + df1 = ref_df[0] + df1_copy = ref_df[1] + df2 = ref_df[2] + df3 = ref_df[3] + + assert intersect_columns(df1, df1_copy) == OrderedSet(["a", "b", "c"]) + assert intersect_columns(df1, df2) == OrderedSet(["a", "b"]) + assert intersect_columns(df1, df3) == OrderedSet(["c"]) + assert intersect_columns(df1_copy, df1) == OrderedSet(["a", "b", "c"]) + assert intersect_columns(df3, df2) == OrderedSet() + + + + +def test_all_columns_match_native(ref_df): + df1 = ref_df[0] + df1_copy = ref_df[1] + df2 = ref_df[2] + df3 = ref_df[3] + + assert all_columns_match(df1, df1_copy) is True + assert all_columns_match(df1, df2) is False + assert all_columns_match(df1, df3) is False + assert all_columns_match(df1_copy, df1) is True + assert all_columns_match(df3, df2) is False + + + + +def test_all_rows_overlap_native( + ref_df, + shuffle_df, +): + # defaults to Compare class + assert all_rows_overlap(ref_df[0], ref_df[0].copy(), join_columns="a") + assert all_rows_overlap(ref_df[0], shuffle_df, join_columns="a") + assert not all_rows_overlap(ref_df[0], ref_df[4], join_columns="a") + # Fugue + assert all_rows_overlap(ref_df[0], shuffle_df, join_columns="a", parallelism=2) + assert not all_rows_overlap(ref_df[0], ref_df[4], join_columns="a", parallelism=2) + diff --git a/tests/test_fugue/test_fugue_polars.py b/tests/test_fugue/test_fugue_polars.py new file mode 100644 index 00000000..65eb3925 --- /dev/null +++ b/tests/test_fugue/test_fugue_polars.py @@ -0,0 +1,108 @@ +import pytest +from ordered_set import OrderedSet +from pytest import raises + +from datacompy import ( + all_columns_match, + all_rows_overlap, + intersect_columns, + is_match, + unq_columns, +) +pl = pytest.importorskip("polars") + +def test_is_match_polars( + ref_df, + shuffle_df, + float_off_df, + upper_case_df, + space_df, + upper_col_df, +): + + rdf = pl.from_pandas(ref_df[0]) + + assert is_match(rdf, shuffle_df, join_columns="a") + + assert not is_match(rdf, float_off_df, join_columns="a") + assert not is_match(rdf, float_off_df, abs_tol=0.00001, join_columns="a") + assert is_match(rdf, float_off_df, abs_tol=0.001, join_columns="a") + assert is_match(rdf, float_off_df, abs_tol=0.001, join_columns="a") + + assert not is_match(rdf, upper_case_df, join_columns="a") + assert is_match(rdf, upper_case_df, join_columns="a", ignore_case=True) + + assert not is_match(rdf, space_df, join_columns="a") + assert is_match(rdf, space_df, join_columns="a", ignore_spaces=True) + + assert is_match(rdf, upper_col_df, join_columns="a") + with raises(AssertionError): + is_match(rdf, upper_col_df, join_columns="a", cast_column_names_lower=False) + +def test_unique_columns_polars(ref_df): + + df1 = ref_df[0] + df1_copy = ref_df[1] + df2 = ref_df[2] + df3 = ref_df[3] + + pdf1 = pl.from_pandas(df1) + pdf1_copy = pl.from_pandas(df1_copy) + pdf2 = pl.from_pandas(df2) + pdf3 = pl.from_pandas(df3) + + assert unq_columns(pdf1, pdf1_copy) == OrderedSet() + assert unq_columns(pdf1, pdf2) == OrderedSet(["c"]) + assert unq_columns(pdf1, pdf3) == OrderedSet(["a", "b"]) + assert unq_columns(pdf1_copy, pdf1) == OrderedSet() + assert unq_columns(pdf3, pdf2) == OrderedSet(["c"]) + +def test_intersect_columns_polars(ref_df): + + df1 = ref_df[0] + df1_copy = ref_df[1] + df2 = ref_df[2] + df3 = ref_df[3] + + pdf1 = pl.from_pandas(df1) + pdf1_copy = pl.from_pandas(df1_copy) + pdf2 = pl.from_pandas(df2) + pdf3 = pl.from_pandas(df3) + + assert intersect_columns(pdf1, pdf1_copy) == OrderedSet(["a", "b", "c"]) + assert intersect_columns(pdf1, pdf2) == OrderedSet(["a", "b"]) + assert intersect_columns(pdf1, pdf3) == OrderedSet(["c"]) + assert intersect_columns(pdf1_copy, pdf1) == OrderedSet(["a", "b", "c"]) + assert intersect_columns(pdf3, pdf2) == OrderedSet() + +def test_all_columns_match_polars(ref_df): + + df1 = ref_df[0] + df1_copy = ref_df[1] + df2 = ref_df[2] + df3 = ref_df[3] + + df1 = pl.from_pandas(df1) + df1_copy = pl.from_pandas(df1_copy) + df2 = pl.from_pandas(df2) + df3 = pl.from_pandas(df3) + + assert all_columns_match(df1, df1_copy) is True + assert all_columns_match(df1, df2) is False + assert all_columns_match(df1, df3) is False + assert all_columns_match(df1_copy, df1) is True + assert all_columns_match(df3, df2) is False + +def test_all_rows_overlap_polars( + ref_df, + shuffle_df, +): + + rdf = pl.from_pandas(ref_df[0]) + rdf_copy = pl.from_pandas(ref_df[0].copy()) + rdf4 = pl.from_pandas(ref_df[4]) + sdf = pl.from_pandas(shuffle_df) + + assert all_rows_overlap(rdf, rdf_copy, join_columns="a") + assert all_rows_overlap(rdf, sdf, join_columns="a") + assert not all_rows_overlap(rdf, rdf4, join_columns="a") diff --git a/tests/test_fugue/test_fugue_spark.py b/tests/test_fugue/test_fugue_spark.py new file mode 100644 index 00000000..fb5b3a43 --- /dev/null +++ b/tests/test_fugue/test_fugue_spark.py @@ -0,0 +1,186 @@ +import pytest +from datacompy import ( + Compare, + all_columns_match, + all_rows_overlap, + intersect_columns, + is_match, + report, + unq_columns, +) +from ordered_set import OrderedSet +from pytest import raises + +from test_fuge_helpers import _compare_report + +pyspark = pytest.importorskip("pyspark") + +def test_is_match_spark( + spark_session, + ref_df, + shuffle_df, + float_off_df, + upper_case_df, + space_df, + upper_col_df, +): + ref_df[0].iteritems = ref_df[0].items # pandas 2 compatibility + rdf = spark_session.createDataFrame(ref_df[0]) + + assert is_match(rdf, shuffle_df, join_columns="a") + + assert not is_match(rdf, float_off_df, join_columns="a") + assert not is_match(rdf, float_off_df, abs_tol=0.00001, join_columns="a") + assert is_match(rdf, float_off_df, abs_tol=0.001, join_columns="a") + assert is_match(rdf, float_off_df, abs_tol=0.001, join_columns="a") + + assert not is_match(rdf, upper_case_df, join_columns="a") + assert is_match(rdf, upper_case_df, join_columns="a", ignore_case=True) + + assert not is_match(rdf, space_df, join_columns="a") + assert is_match(rdf, space_df, join_columns="a", ignore_spaces=True) + + assert is_match(rdf, upper_col_df, join_columns="a") + + with raises(AssertionError): + is_match(rdf, upper_col_df, join_columns="a", cast_column_names_lower=False) + + assert is_match( + spark_session.sql("SELECT 'a' AS a, 'b' AS b"), + spark_session.sql("SELECT 'a' AS a, 'b' AS b"), + join_columns="a", + ) + + +def test_report_spark( + spark_session, + simple_diff_df1, + simple_diff_df2, + no_intersection_diff_df1, + no_intersection_diff_df2, + large_diff_df1, + large_diff_df2, +): + simple_diff_df1.iteritems = simple_diff_df1.items # pandas 2 compatibility + simple_diff_df2.iteritems = simple_diff_df2.items # pandas 2 compatibility + no_intersection_diff_df1.iteritems = ( + no_intersection_diff_df1.items + ) # pandas 2 compatibility + no_intersection_diff_df2.iteritems = ( + no_intersection_diff_df2.items + ) # pandas 2 compatibility + large_diff_df1.iteritems = large_diff_df1.items # pandas 2 compatibility + large_diff_df2.iteritems = large_diff_df2.items # pandas 2 compatibility + + df1 = spark_session.createDataFrame(simple_diff_df1) + df2 = spark_session.createDataFrame(simple_diff_df2) + comp = Compare(simple_diff_df1, simple_diff_df2, join_columns="aa") + a = report(df1, df2, ["aa"]) + _compare_report(comp.report(), a) + + df1 = spark_session.createDataFrame(no_intersection_diff_df1) + df2 = spark_session.createDataFrame(no_intersection_diff_df2) + comp = Compare(no_intersection_diff_df1, no_intersection_diff_df2, join_columns="x") + a = report(df1, df2, ["x"]) + _compare_report(comp.report(), a) + + # due to https://github.com/capitalone/datacompy/issues/221 + # we can have y as a constant to ensure all the x matches are equal + + df1 = spark_session.createDataFrame(large_diff_df1) + df2 = spark_session.createDataFrame(large_diff_df2) + comp = Compare(large_diff_df1, large_diff_df2, join_columns="x") + a = report(df1, df2, ["x"]) + _compare_report(comp.report(), a, truncate=True) + + +def test_unique_columns_spark(spark_session, ref_df): + df1 = ref_df[0] + df1_copy = ref_df[1] + df2 = ref_df[2] + df3 = ref_df[3] + + df1.iteritems = df1.items # pandas 2 compatibility + df1_copy.iteritems = df1_copy.items # pandas 2 compatibility + df2.iteritems = df2.items # pandas 2 compatibility + df3.iteritems = df3.items # pandas 2 compatibility + + sdf1 = spark_session.createDataFrame(df1) + sdf1_copy = spark_session.createDataFrame(df1_copy) + sdf2 = spark_session.createDataFrame(df2) + sdf3 = spark_session.createDataFrame(df3) + + assert unq_columns(sdf1, sdf1_copy) == OrderedSet() + assert unq_columns(sdf1, sdf2) == OrderedSet(["c"]) + assert unq_columns(sdf1, sdf3) == OrderedSet(["a", "b"]) + assert unq_columns(sdf1_copy, sdf1) == OrderedSet() + assert unq_columns(sdf3, sdf2) == OrderedSet(["c"]) + + +def test_intersect_columns_spark(spark_session, ref_df): + df1 = ref_df[0] + df1_copy = ref_df[1] + df2 = ref_df[2] + df3 = ref_df[3] + + df1.iteritems = df1.items # pandas 2 compatibility + df1_copy.iteritems = df1_copy.items # pandas 2 compatibility + df2.iteritems = df2.items # pandas 2 compatibility + df3.iteritems = df3.items # pandas 2 compatibility + + sdf1 = spark_session.createDataFrame(df1) + sdf1_copy = spark_session.createDataFrame(df1_copy) + sdf2 = spark_session.createDataFrame(df2) + sdf3 = spark_session.createDataFrame(df3) + + assert intersect_columns(sdf1, sdf1_copy) == OrderedSet(["a", "b", "c"]) + assert intersect_columns(sdf1, sdf2) == OrderedSet(["a", "b"]) + assert intersect_columns(sdf1, sdf3) == OrderedSet(["c"]) + assert intersect_columns(sdf1_copy, sdf1) == OrderedSet(["a", "b", "c"]) + assert intersect_columns(sdf3, sdf2) == OrderedSet() + + +def test_all_columns_match_spark(spark_session, ref_df): + df1 = ref_df[0] + df1_copy = ref_df[1] + df2 = ref_df[2] + df3 = ref_df[3] + + df1.iteritems = df1.items # pandas 2 compatibility + df1_copy.iteritems = df1_copy.items # pandas 2 compatibility + df2.iteritems = df2.items # pandas 2 compatibility + df3.iteritems = df3.items # pandas 2 compatibility + + df1 = spark_session.createDataFrame(df1) + df1_copy = spark_session.createDataFrame(df1_copy) + df2 = spark_session.createDataFrame(df2) + df3 = spark_session.createDataFrame(df3) + + assert all_columns_match(df1, df1_copy) is True + assert all_columns_match(df1, df2) is False + assert all_columns_match(df1, df3) is False + assert all_columns_match(df1_copy, df1) is True + assert all_columns_match(df3, df2) is False + + +def test_all_rows_overlap_spark( + spark_session, + ref_df, + shuffle_df, +): + ref_df[0].iteritems = ref_df[0].items # pandas 2 compatibility + ref_df[4].iteritems = ref_df[4].items # pandas 2 compatibility + shuffle_df.iteritems = shuffle_df.items # pandas 2 compatibility + rdf = spark_session.createDataFrame(ref_df[0]) + rdf_copy = spark_session.createDataFrame(ref_df[0]) + rdf4 = spark_session.createDataFrame(ref_df[4]) + sdf = spark_session.createDataFrame(shuffle_df) + + assert all_rows_overlap(rdf, rdf_copy, join_columns="a") + assert all_rows_overlap(rdf, sdf, join_columns="a") + assert not all_rows_overlap(rdf, rdf4, join_columns="a") + assert all_rows_overlap( + spark_session.sql("SELECT 'a' AS a, 'b' AS b"), + spark_session.sql("SELECT 'a' AS a, 'b' AS b"), + join_columns="a", + ) From 7f384242497162185398e2b56b7ec37f8c2b1609 Mon Sep 17 00:00:00 2001 From: JacobDawang Date: Thu, 7 Dec 2023 14:38:00 -0700 Subject: [PATCH 4/8] Install datacompy step --- .github/workflows/test-package.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test-package.yml b/.github/workflows/test-package.yml index 64460aba..5de6e485 100644 --- a/.github/workflows/test-package.yml +++ b/.github/workflows/test-package.yml @@ -36,7 +36,7 @@ jobs: java-version: '8' distribution: 'adopt' - - name: Install Spark + - name: Install Spark and datacompy run: | python -m pip install --upgrade pip python -m pip install pytest pytest-spark pypandoc @@ -64,7 +64,7 @@ jobs: with: python-version: ${{ matrix.python-version }} - - name: Install Spark + - name: Install datacompy run: | python -m pip install --upgrade pip python -m pip install .[tests] From 116c3a7dd4c196155e41885407d384d732a92c7c Mon Sep 17 00:00:00 2001 From: JacobDawang Date: Thu, 7 Dec 2023 14:40:36 -0700 Subject: [PATCH 5/8] Drop 3.0 and add 3.5 --- .github/workflows/test-package.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test-package.yml b/.github/workflows/test-package.yml index 5de6e485..5b361b38 100644 --- a/.github/workflows/test-package.yml +++ b/.github/workflows/test-package.yml @@ -17,7 +17,7 @@ jobs: fail-fast: false matrix: python-version: [3.8, 3.9, '3.10'] - spark-version: [3.0.3, 3.1.3, 3.2.3, 3.3.1, 3.4.0] + spark-version: [3.1.3, 3.2.3, 3.3.1, 3.4.0, 3.5.0] env: PYTHON_VERSION: ${{ matrix.python-version }} SPARK_VERSION: ${{ matrix.spark-version }} From ce700a391548b7c8c15e88446910a4865901cb1a Mon Sep 17 00:00:00 2001 From: JacobDawang Date: Thu, 7 Dec 2023 14:44:04 -0700 Subject: [PATCH 6/8] Add fugue no spark --- .github/workflows/test-package.yml | 30 ++++++++++++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test-package.yml b/.github/workflows/test-package.yml index 5b361b38..b9555ae2 100644 --- a/.github/workflows/test-package.yml +++ b/.github/workflows/test-package.yml @@ -10,7 +10,7 @@ on: branches: [develop, main] jobs: - test-complete-install: + test-dev-install: runs-on: ubuntu-latest strategy: @@ -41,7 +41,7 @@ jobs: python -m pip install --upgrade pip python -m pip install pytest pytest-spark pypandoc python -m pip install pyspark==${{ matrix.spark-version }} - python -m pip install .[tests,tests-spark,spark] + python -m pip install .[dev] - name: Test with pytest run: | python -m pytest tests/ @@ -71,3 +71,29 @@ jobs: - name: Test with pytest run: | python -m pytest tests/ + + test-fugue-install-no-spark: + + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + python-version: [3.8, 3.9, '3.10'] + env: + PYTHON_VERSION: ${{ matrix.python-version }} + + steps: + - uses: actions/checkout@v3 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Install datacompy + run: | + python -m pip install --upgrade pip + python -m pip install .[tests,duckdb,polars,dask,ray] + - name: Test with pytest + run: | + python -m pytest tests/ From bfcb1cb06713b264f3483822b226e476ef760d24 Mon Sep 17 00:00:00 2001 From: JacobDawang Date: Thu, 7 Dec 2023 14:53:10 -0700 Subject: [PATCH 7/8] Add permissions --- .github/workflows/test-package.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/test-package.yml b/.github/workflows/test-package.yml index b9555ae2..b5ec2ea2 100644 --- a/.github/workflows/test-package.yml +++ b/.github/workflows/test-package.yml @@ -9,6 +9,9 @@ on: pull_request: branches: [develop, main] +permissions: + contents: read + jobs: test-dev-install: From e5128035146a19e1594d013e04b748f960475778 Mon Sep 17 00:00:00 2001 From: JacobDawang Date: Thu, 7 Dec 2023 14:53:40 -0700 Subject: [PATCH 8/8] Format and docstrings --- tests/test_fugue/conftest.py | 1 + tests/test_fugue/test_duckdb.py | 15 +++++++++++++++ tests/test_fugue/test_fuge_helpers.py | 5 ----- tests/test_fugue/test_fugue_helpers.py | 22 ++++++++++++++++++++++ tests/test_fugue/test_fugue_pandas.py | 20 ++++---------------- tests/test_fugue/test_fugue_polars.py | 26 +++++++++++++++++++++----- tests/test_fugue/test_fugue_spark.py | 18 +++++++++++++++++- 7 files changed, 80 insertions(+), 27 deletions(-) delete mode 100644 tests/test_fugue/test_fuge_helpers.py create mode 100644 tests/test_fugue/test_fugue_helpers.py diff --git a/tests/test_fugue/conftest.py b/tests/test_fugue/conftest.py index 1176f1c1..6a5683d2 100644 --- a/tests/test_fugue/conftest.py +++ b/tests/test_fugue/conftest.py @@ -2,6 +2,7 @@ import numpy as np import pandas as pd + @pytest.fixture def ref_df(): np.random.seed(0) diff --git a/tests/test_fugue/test_duckdb.py b/tests/test_fugue/test_duckdb.py index 232bc828..39465f42 100644 --- a/tests/test_fugue/test_duckdb.py +++ b/tests/test_fugue/test_duckdb.py @@ -1,3 +1,18 @@ +# +# Copyright 2020 Capital One Services, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Test fugue functionality with duckdb.""" import pytest from ordered_set import OrderedSet from pytest import raises diff --git a/tests/test_fugue/test_fuge_helpers.py b/tests/test_fugue/test_fuge_helpers.py deleted file mode 100644 index 67d8aafd..00000000 --- a/tests/test_fugue/test_fuge_helpers.py +++ /dev/null @@ -1,5 +0,0 @@ -def _compare_report(expected, actual, truncate=False): - if truncate: - expected = expected.split("Sample Rows", 1)[0] - actual = actual.split("Sample Rows", 1)[0] - assert expected == actual diff --git a/tests/test_fugue/test_fugue_helpers.py b/tests/test_fugue/test_fugue_helpers.py new file mode 100644 index 00000000..2c2d56a9 --- /dev/null +++ b/tests/test_fugue/test_fugue_helpers.py @@ -0,0 +1,22 @@ +# +# Copyright 2020 Capital One Services, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""One test helper for fugue reports.""" + +def _compare_report(expected, actual, truncate=False): + """Compare datacompy reports.""" + if truncate: + expected = expected.split("Sample Rows", 1)[0] + actual = actual.split("Sample Rows", 1)[0] + assert expected == actual diff --git a/tests/test_fugue/test_fugue_pandas.py b/tests/test_fugue/test_fugue_pandas.py index cdf22e2b..3bacb30e 100644 --- a/tests/test_fugue/test_fugue_pandas.py +++ b/tests/test_fugue/test_fugue_pandas.py @@ -12,10 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -""" -Testing out the fugue is_match functionality -""" +"""Test the fugue functionality with pandas.""" from io import StringIO import pandas as pd from ordered_set import OrderedSet @@ -31,7 +28,7 @@ unq_columns, ) -from test_fuge_helpers import _compare_report +from test_fugue_helpers import _compare_report def test_is_match_native( @@ -81,9 +78,6 @@ def test_is_match_native( ) - - - def test_doc_case(): data1 = """acct_id,dollar_amt,name,float_fld,date_fld 10000001234,123.45,George Maharis,14530.1555,2017-01-01 @@ -115,6 +109,7 @@ def test_doc_case(): parallelism=2, ) + def test_report_pandas( simple_diff_df1, simple_diff_df2, @@ -146,6 +141,7 @@ def test_report_pandas( a = report(large_diff_df1, large_diff_df2, "x", parallelism=2) _compare_report(comp.report(), a, truncate=True) + def test_unique_columns_native(ref_df): df1 = ref_df[0] df1_copy = ref_df[1] @@ -159,9 +155,6 @@ def test_unique_columns_native(ref_df): assert unq_columns(df3, df2) == OrderedSet(["c"]) - - - def test_intersect_columns_native(ref_df): df1 = ref_df[0] df1_copy = ref_df[1] @@ -175,8 +168,6 @@ def test_intersect_columns_native(ref_df): assert intersect_columns(df3, df2) == OrderedSet() - - def test_all_columns_match_native(ref_df): df1 = ref_df[0] df1_copy = ref_df[1] @@ -190,8 +181,6 @@ def test_all_columns_match_native(ref_df): assert all_columns_match(df3, df2) is False - - def test_all_rows_overlap_native( ref_df, shuffle_df, @@ -203,4 +192,3 @@ def test_all_rows_overlap_native( # Fugue assert all_rows_overlap(ref_df[0], shuffle_df, join_columns="a", parallelism=2) assert not all_rows_overlap(ref_df[0], ref_df[4], join_columns="a", parallelism=2) - diff --git a/tests/test_fugue/test_fugue_polars.py b/tests/test_fugue/test_fugue_polars.py index 65eb3925..97b88c7e 100644 --- a/tests/test_fugue/test_fugue_polars.py +++ b/tests/test_fugue/test_fugue_polars.py @@ -1,3 +1,18 @@ +# +# Copyright 2020 Capital One Services, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Test fugue and polars.""" import pytest from ordered_set import OrderedSet from pytest import raises @@ -9,8 +24,10 @@ is_match, unq_columns, ) + pl = pytest.importorskip("polars") + def test_is_match_polars( ref_df, shuffle_df, @@ -19,7 +36,6 @@ def test_is_match_polars( space_df, upper_col_df, ): - rdf = pl.from_pandas(ref_df[0]) assert is_match(rdf, shuffle_df, join_columns="a") @@ -39,8 +55,8 @@ def test_is_match_polars( with raises(AssertionError): is_match(rdf, upper_col_df, join_columns="a", cast_column_names_lower=False) -def test_unique_columns_polars(ref_df): +def test_unique_columns_polars(ref_df): df1 = ref_df[0] df1_copy = ref_df[1] df2 = ref_df[2] @@ -57,8 +73,8 @@ def test_unique_columns_polars(ref_df): assert unq_columns(pdf1_copy, pdf1) == OrderedSet() assert unq_columns(pdf3, pdf2) == OrderedSet(["c"]) -def test_intersect_columns_polars(ref_df): +def test_intersect_columns_polars(ref_df): df1 = ref_df[0] df1_copy = ref_df[1] df2 = ref_df[2] @@ -75,8 +91,8 @@ def test_intersect_columns_polars(ref_df): assert intersect_columns(pdf1_copy, pdf1) == OrderedSet(["a", "b", "c"]) assert intersect_columns(pdf3, pdf2) == OrderedSet() -def test_all_columns_match_polars(ref_df): +def test_all_columns_match_polars(ref_df): df1 = ref_df[0] df1_copy = ref_df[1] df2 = ref_df[2] @@ -93,11 +109,11 @@ def test_all_columns_match_polars(ref_df): assert all_columns_match(df1_copy, df1) is True assert all_columns_match(df3, df2) is False + def test_all_rows_overlap_polars( ref_df, shuffle_df, ): - rdf = pl.from_pandas(ref_df[0]) rdf_copy = pl.from_pandas(ref_df[0].copy()) rdf4 = pl.from_pandas(ref_df[4]) diff --git a/tests/test_fugue/test_fugue_spark.py b/tests/test_fugue/test_fugue_spark.py index fb5b3a43..0ef86c24 100644 --- a/tests/test_fugue/test_fugue_spark.py +++ b/tests/test_fugue/test_fugue_spark.py @@ -1,3 +1,18 @@ +# +# Copyright 2020 Capital One Services, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Test fugue and spark.""" import pytest from datacompy import ( Compare, @@ -11,10 +26,11 @@ from ordered_set import OrderedSet from pytest import raises -from test_fuge_helpers import _compare_report +from test_fugue_helpers import _compare_report pyspark = pytest.importorskip("pyspark") + def test_is_match_spark( spark_session, ref_df,