Skip to content

Commit

Permalink
Categorical polars update
Browse files Browse the repository at this point in the history
  • Loading branch information
abajpai15 committed Apr 23, 2024
1 parent b3633eb commit 8f8b528
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 45 deletions.
24 changes: 16 additions & 8 deletions dataprofiler/profilers/categorical_column_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import cast

import datasketches
import polars as pl
from pandas import DataFrame, Series

from .. import dp_logging
Expand Down Expand Up @@ -474,7 +475,7 @@ def _check_stop_condition_is_met(self, sample_size: int, unqiue_ratio: float):
return True
return False

def _update_stop_condition(self, data: DataFrame):
def _update_stop_condition(self, data: DataFrame | pl.DataFrame):
"""Return value stop_condition_is_met given stop conditions.
:param data: Dataframe currently being processed by categorical profiler
Expand All @@ -497,8 +498,8 @@ def _get_categories_cms(self, df_series, len_df):
"""Return count min sketch and heavy hitters for both the batch and stream case.
:param df_series: Series currently being processed by categorical profiler
:type df_series: Series
:param len_df: the total number of samples iin df_series
:type df_series: polars.Series
:param len_df: the total number of samples in df_series
:type len_df: int
:return: cms, heavy_hitter_dict, missing_heavy_hitter_dict
"""
Expand Down Expand Up @@ -601,13 +602,13 @@ def _get_categories_full(self, df_series) -> dict:
:return: dict of counts for each unique value
:rtype: dict
"""
category_count: dict = df_series.value_counts(dropna=False).to_dict()
category_count: dict = Series(df_series).value_counts(dropna=False).to_dict()
return category_count

@BaseColumnProfiler._timeit(name="categories")
def _update_categories(
self,
df_series: DataFrame,
df_series: DataFrame | pl.DataFrame,
prev_dependent_properties: dict = None,
subset_properties: dict = None,
) -> None:
Expand Down Expand Up @@ -657,7 +658,9 @@ def _update_categories(
if self._stop_condition_is_met:
self._categories = {}

def _update_helper(self, df_series_clean: Series, profile: dict) -> None:
def _update_helper(
self, df_series_clean: Series | pl.Series, profile: dict
) -> None:
"""
Update col profile properties with clean dataset and its known profile.
Expand All @@ -669,7 +672,7 @@ def _update_helper(self, df_series_clean: Series, profile: dict) -> None:
"""
self._update_column_base_properties(profile)

def update(self, df_series: Series) -> CategoricalColumn:
def update(self, df_series: pl.Series | Series) -> CategoricalColumn:
"""
Update the column profile.
Expand All @@ -682,12 +685,17 @@ def update(self, df_series: Series) -> CategoricalColumn:
if len(df_series) == 0 or self._stop_condition_is_met:
return self

if isinstance(df_series, pl.Series):
pandas_df = df_series.to_pandas()
else:
pandas_df = df_series

profile = dict(sample_size=len(df_series))
CategoricalColumn._update_categories(self, df_series)
BaseColumnProfiler._perform_property_calcs(
self,
self.__calculations,
df_series=df_series,
df_series=pandas_df,
prev_dependent_properties={},
subset_properties=profile,
)
Expand Down
75 changes: 38 additions & 37 deletions dataprofiler/tests/profilers/test_categorical_column_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import numpy as np
import pandas as pd
import polars as pl

from dataprofiler.profilers import CategoricalColumn
from dataprofiler.profilers.json_decoder import load_column_profile
Expand Down Expand Up @@ -51,7 +52,7 @@ def test_correct_categorical_model_string(self):
self.assertCountEqual(categories, profile.categories)

def test_stop_condition_is_met_initially(self):
dataset = pd.Series(["a"] * 10 + ["b"] * 10 + ["c"] * 10 + ["d"] * 10)
dataset = pl.Series(["a"] * 10 + ["b"] * 10 + ["c"] * 10 + ["d"] * 10)
profile = CategoricalColumn("test dataset")
profile.max_sample_size_to_check_stop_condition = 0
profile.stop_condition_unique_value_ratio = 0
Expand Down Expand Up @@ -368,7 +369,7 @@ def test_categorical_mapping(self):
self.assertNotEqual(num_nan_count, len(column_profile.null_types_index["NaN"]))

def test_true_categorical_report(self):
df_categorical = pd.Series(
df_categorical = pl.Series(
[
"a",
"a",
Expand Down Expand Up @@ -415,7 +416,7 @@ def test_true_categorical_report(self):
self.assertEqual(report, expected_profile)

def test_false_categorical_report(self):
df_non_categorical = pd.Series(list(map(str, range(0, 20))))
df_non_categorical = pl.Series(list(map(str, range(0, 20))))
profile = CategoricalColumn(df_non_categorical.name)
profile.update(df_non_categorical)

Expand All @@ -433,7 +434,7 @@ def test_false_categorical_report(self):
self.assertEqual(report, expected_profile)

def test_report(self):
df_non_categorical = pd.Series(list(map(str, range(0, 20))))
df_non_categorical = pl.Series(list(map(str, range(0, 20))))
profile = CategoricalColumn(df_non_categorical.name)
profile.update(df_non_categorical)

Expand Down Expand Up @@ -681,32 +682,32 @@ def test_categorical_merge(self):

def test_gini_impurity(self):
# Normal test
df_categorical = pd.Series(["y", "y", "y", "y", "n", "n", "n"])
df_categorical = pl.Series(["y", "y", "y", "y", "n", "n", "n"])
profile = CategoricalColumn(df_categorical.name)
profile.update(df_categorical)
expected_val = ((4 / 7) * (3 / 7)) + ((4 / 7) * (3 / 7))
self.assertAlmostEqual(profile.gini_impurity, expected_val)

# One class only test
df_categorical = pd.Series(["y", "y", "y", "y", "y", "y", "y"])
df_categorical = pl.Series(["y", "y", "y", "y", "y", "y", "y"])
profile = CategoricalColumn(df_categorical.name)
profile.update(df_categorical)
expected_val = 0
self.assertEqual(profile.gini_impurity, expected_val)

# Empty test
df_categorical = pd.Series([])
df_categorical = pl.Series([])
profile = CategoricalColumn(df_categorical.name)
profile.update(df_categorical)
self.assertEqual(profile.gini_impurity, None)

def test_categorical_diff(self):
# test psi new category in another profile
df_categorical = pd.Series(["y", "y", "y", "y", "n", "n", "n"])
df_categorical = pl.Series(["y", "y", "y", "y", "n", "n", "n"])
profile = CategoricalColumn(df_categorical.name)
profile.update(df_categorical)

df_categorical = pd.Series(["y", "maybe", "y", "y", "n", "n", "maybe"])
df_categorical = pl.Series(["y", "maybe", "y", "y", "n", "n", "maybe"])
profile2 = CategoricalColumn(df_categorical.name)
profile2.update(df_categorical)

Expand Down Expand Up @@ -734,7 +735,7 @@ def test_categorical_diff(self):
self.assertDictEqual(expected_diff, actual_diff)

# Test with one categorical column matching
df_not_categorical = pd.Series(
df_not_categorical = pl.Series(
[
"THIS",
"is",
Expand All @@ -759,11 +760,11 @@ def test_categorical_diff(self):
self.assertDictEqual(expected_diff, profile.diff(profile2))

# Test diff with psi enabled
df_categorical = pd.Series(["y", "y", "y", "y", "n", "n", "n", "maybe"])
df_categorical = pl.Series(["y", "y", "y", "y", "n", "n", "n", "maybe"])
profile = CategoricalColumn(df_categorical.name)
profile.update(df_categorical)

df_categorical = pd.Series(["y", "maybe", "y", "y", "n", "n", "maybe"])
df_categorical = pl.Series(["y", "maybe", "y", "y", "n", "n", "maybe"])
profile2 = CategoricalColumn(df_categorical.name)
profile2.update(df_categorical)

Expand All @@ -787,40 +788,40 @@ def test_categorical_diff(self):
self.assertDictEqual(expected_diff, profile.diff(profile2))

def test_unalikeability(self):
df_categorical = pd.Series(["a", "a"])
df_categorical = pl.Series(["a", "a"])
profile = CategoricalColumn(df_categorical.name)
profile.update(df_categorical)
self.assertEqual(profile.unalikeability, 0)

df_categorical = pd.Series(["a", "c", "b"])
df_categorical = pl.Series(["a", "c", "b"])
profile = CategoricalColumn(df_categorical.name)
profile.update(df_categorical)
self.assertEqual(profile.unalikeability, 1)

df_categorical = pd.Series(["a", "a", "a", "b", "b", "b"])
df_categorical = pl.Series(["a", "a", "a", "b", "b", "b"])
profile = CategoricalColumn(df_categorical.name)
profile.update(df_categorical)
self.assertEqual(profile.unalikeability, 18 / 30)

df_categorical = pd.Series(["a", "a", "b", "b", "b", "a", "c", "c", "a", "a"])
df_categorical = pl.Series(["a", "a", "b", "b", "b", "a", "c", "c", "a", "a"])
profile = CategoricalColumn(df_categorical.name)
profile.update(df_categorical)
self.assertEqual(profile.unalikeability, 2 * (10 + 15 + 6) / 90)

df_categorical = pd.Series(["a"])
df_categorical = pl.Series(["a"])
profile = CategoricalColumn(df_categorical.name)
profile.update(df_categorical)
self.assertEqual(0, profile.unalikeability)

df_categorical = pd.Series([])
df_categorical = pl.Series([])
profile = CategoricalColumn(df_categorical.name)
profile.update(df_categorical)
self.assertEqual(None, profile.unalikeability)

def test_top_k_categories_change(self):
# Test if top_k_categories is None
options = CategoricalOptions()
df_series = pd.Series(["a", "a", "b", "c", "d", "e", "e", "e", "f", "g"])
df_series = pl.Series(["a", "a", "b", "c", "d", "e", "e", "e", "f", "g"])
profile = CategoricalColumn(df_series.name, options)
profile.update(df_series)
self.assertEqual(len(profile.profile["statistics"]["categorical_count"]), 7)
Expand All @@ -831,7 +832,7 @@ def test_top_k_categories_change(self):

# Test if top_k_categories is greater than the count of categories
options.top_k_categories = 6
df_series = pd.Series(["a", "a", "b", "c", "d"])
df_series = pl.Series(["a", "a", "b", "c", "d"])
profile = CategoricalColumn(df_series.name, options)
profile.update(df_series)
self.assertEqual(len(profile.profile["statistics"]["categorical_count"]), 4)
Expand Down Expand Up @@ -947,7 +948,7 @@ def test_json_decode_after_update(self):
# Actual deserialization

# Build expected CategoricalColumn
df_categorical = pd.Series(
df_categorical = pl.Series(
[
"a",
"a",
Expand All @@ -973,7 +974,7 @@ def test_json_decode_after_update(self):

test_utils.assert_profiles_equal(deserialized, expected_profile)

df_categorical = pd.Series(
df_categorical = pl.Series(
[
"a", # add existing
"d", # add new
Expand All @@ -987,7 +988,7 @@ def test_json_decode_after_update(self):
assert deserialized.categorical_counts == {"c": 5, "b": 4, "a": 4, "d": 1}

def test_cms_max_num_heavy_hitters(self):
df_categorical = pd.Series(["a"] * 5 + ["b"] * 5 + ["c"] * 10)
df_categorical = pl.Series(["a"] * 5 + ["b"] * 5 + ["c"] * 10)

options = CategoricalOptions()
options.cms = True
Expand All @@ -1002,8 +1003,8 @@ def test_cms_max_num_heavy_hitters(self):
self.assertTrue(profile.sample_size >= 10)

def test_cms_update_hybrid_batch_stream(self):
dataset = pd.Series(["a"] * 7 + ["b"] * 9 + ["c"] * 14)
dataset1 = pd.Series(["a"] * 9 + ["b"] * 11 + ["c"] * 9 + ["d"] * 1)
dataset = pl.Series(["a"] * 7 + ["b"] * 9 + ["c"] * 14)
dataset1 = pl.Series(["a"] * 9 + ["b"] * 11 + ["c"] * 9 + ["d"] * 1)

options = CategoricalOptions()
options.cms = True
Expand Down Expand Up @@ -1031,8 +1032,8 @@ def test_cms_update_hybrid_batch_stream(self):

def test_cms_profile_merge_via_add(self):

dataset = pd.Series(["a"] * 9 + ["b"] * 12 + ["c"] * 9)
dataset1 = pd.Series(["a"] * 6 + ["b"] * 10 + ["c"] * 14)
dataset = pl.Series(["a"] * 9 + ["b"] * 12 + ["c"] * 9)
dataset1 = pl.Series(["a"] * 6 + ["b"] * 10 + ["c"] * 14)

expected_categories = ["b", "c"]
expected_categories_dict = {"b": 22, "c": 23}
Expand Down Expand Up @@ -1074,8 +1075,8 @@ def test_cms_profile_merge_via_add(self):

def test_cms_profile_min_max_num_heavy_hitters(self):

dataset = pd.Series(["a"] * 9 + ["b"] * 12 + ["c"] * 9)
dataset1 = pd.Series(["a"] * 6 + ["b"] * 10 + ["c"] * 14)
dataset = pl.Series(["a"] * 9 + ["b"] * 12 + ["c"] * 9)
dataset1 = pl.Series(["a"] * 6 + ["b"] * 10 + ["c"] * 14)

options = CategoricalOptions()
options.cms = True
Expand All @@ -1097,8 +1098,8 @@ def test_cms_profile_min_max_num_heavy_hitters(self):

def test_cms_catch_overwriting_with_missing_dict(self):

dataset = pd.Series(["b"] * 2 + ["c"] * 14)
dataset1 = pd.Series(["b"] * 5 + ["c"] * 10)
dataset = pl.Series(["b"] * 2 + ["c"] * 14)
dataset1 = pl.Series(["b"] * 5 + ["c"] * 10)

options = CategoricalOptions()
options.cms = True
Expand Down Expand Up @@ -1126,7 +1127,7 @@ def test_cms_catch_overwriting_with_missing_dict(self):

def test_cms_vs_full_mismatch_merge(self):

dataset = pd.Series(["b"] * 2 + ["c"] * 14)
dataset = pl.Series(["b"] * 2 + ["c"] * 14)

options = CategoricalOptions()
options.cms = True
Expand Down Expand Up @@ -1176,7 +1177,7 @@ def test_fewer_than_MAXIMUM_UNIQUE_VALUES_TO_CLASSIFY_AS_CATEGORICAL(self):
]

len_unique = len(set(cat_sentence_list))
cat_sentence_df = pd.Series(cat_sentence_list)
cat_sentence_df = pl.Series(cat_sentence_list)
column_profile = StructuredColProfiler(cat_sentence_df)
cat_profiler = column_profile.profiles["data_stats_profile"]._profiles[
"category"
Expand All @@ -1200,7 +1201,7 @@ def test_greater_than_CATEGORICAL_THRESHOLD_DEFAULT_identify_as_text(self):
)
cat_sentence_list = list_unique_values * num_sentences

cat_sentence_df = pd.Series(cat_sentence_list)
cat_sentence_df = pl.Series(cat_sentence_list)
column_profile = StructuredColProfiler(cat_sentence_df)
cat_profiler = column_profile.profiles["data_stats_profile"]._profiles[
"category"
Expand All @@ -1226,7 +1227,7 @@ def test_less_than_CATEGORICAL_THRESHOLD_DEFAULT(self):
cat_sentence_list = list_unique_values * num_sentences

len_unique = len(set(cat_sentence_list))
cat_sentence_df = pd.Series(cat_sentence_list)
cat_sentence_df = pl.Series(cat_sentence_list)
column_profile = StructuredColProfiler(cat_sentence_df)
cat_profiler = column_profile.profiles["data_stats_profile"]._profiles[
"category"
Expand Down Expand Up @@ -1255,7 +1256,7 @@ def test_uppercase_less_than_CATEGORICAL_THRESHOLD_DEFAULT(self):
cat_sentence_list[-3] = self.test_sentence_upper3 + str(num_sentences - 2)

len_unique = len(set(cat_sentence_list))
cat_sentence_df = pd.Series(cat_sentence_list)
cat_sentence_df = pl.Series(cat_sentence_list)
column_profile = StructuredColProfiler(cat_sentence_df)
cat_profiler = column_profile.profiles["data_stats_profile"]._profiles[
"category"
Expand All @@ -1279,7 +1280,7 @@ def test_long_sentences_fewer_than_MAXIMUM_UNIQUE_VALUES_TO_CLASSIFY_AS_CATEGORI
]

len_unique = len(set(cat_sentence_list))
cat_sentence_df = pd.Series(cat_sentence_list)
cat_sentence_df = pl.Series(cat_sentence_list)
column_profile = StructuredColProfiler(cat_sentence_df)
cat_profiler = column_profile.profiles["data_stats_profile"]._profiles[
"category"
Expand Down

0 comments on commit 8f8b528

Please sign in to comment.