Skip to content

Commit

Permalink
Polars feature updates (#1119)
Browse files Browse the repository at this point in the history
* Add polars to datetime_column_profile

* Polars added to unstructured labeler

* Quick fix for keras and tensorflow

* Add polars to unstructured text

* Correct polars usage

* Dask version

* Quick fix for keras and tensorflow

* Minor polars updates

* Change type for isinstance
  • Loading branch information
abajpai15 authored Apr 22, 2024
1 parent e6dd865 commit 503efa2
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 90 deletions.
25 changes: 13 additions & 12 deletions dataprofiler/profilers/datetime_column_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import numpy as np
import pandas as pd
import polars as pl

from . import profiler_utils
from .base_column_profilers import BaseColumnPrimitiveTypeProfiler, BaseColumnProfiler
Expand Down Expand Up @@ -256,8 +257,7 @@ def _get_datetime_profile(cls, df_series: pd.Series) -> dict:
profile: dict = dict()
activated_date_formats: list = list()
len_df = len(df_series)

is_row_datetime = pd.Series(np.full((len(df_series)), False))
is_row_datetime = pd.Series(np.full((len_df), False))

min_value = None
max_value = None
Expand All @@ -275,18 +275,19 @@ def _get_datetime_profile(cls, df_series: pd.Series) -> dict:
)
)

df_dates = valid_dates[~valid_dates.isnull()]
df_dates = pl.Series(valid_dates[~valid_dates.isnull()])

if "%b" in date_format and not df_dates.empty:
if "%b" in date_format and not df_dates.is_empty():
may_month = 5 # May can be %b or %B we want to force, so check
all_may = df_dates.apply(lambda x: x.month == may_month).all()
all_may = df_dates.map_elements(lambda x: x.month == may_month)
all_may = pl.Series(all_may).all()
if all_may:
valid_dates[:] = np.nan
df_dates = pd.Series([], dtype=object)
valid_dates[:] = None
df_dates = pl.Series([])

# Create mask to avoid null dates
null_date_mask = valid_dates.isnull()
np_date_array = df_dates.values
np_date_array = df_dates.to_numpy()

# check off any values which were found to be datetime
is_row_datetime[~is_row_datetime] = (~null_date_mask).values
Expand All @@ -298,18 +299,18 @@ def _get_datetime_profile(cls, df_series: pd.Series) -> dict:
max_idx = np.argmax(np_date_array)

# Selects the min, ma value objects for comparison
tmp_min_value_obj = df_dates.iloc[min_idx]
tmp_max_value_obj = df_dates.iloc[max_idx]
tmp_min_value_obj = df_dates.item(int(min_idx))
tmp_max_value_obj = df_dates.item(int(max_idx))

# If minimum value, keep reference
if tmp_min_value_obj < min_value_obj:
min_value = df_series[~null_date_mask].iloc[min_idx]
min_value_obj = tmp_min_value_obj
min_value_obj = pd.Timestamp(tmp_min_value_obj)

# If maximum value, keep reference
if tmp_max_value_obj > max_value_obj:
max_value = df_series[~null_date_mask].iloc[max_idx]
max_value_obj = tmp_max_value_obj
max_value_obj = pd.Timestamp(tmp_max_value_obj)

df_series = df_series[null_date_mask]

Expand Down
2 changes: 1 addition & 1 deletion dataprofiler/profilers/profiler_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,7 @@ def perform_chi_squared_test_for_homogeneity(
# If one or less categories, we have zero/negative degrees of freedom,
# which is not an appropriate value for this context
num_cats = len(cat_counts)
if len(cat_counts) <= 1:
if num_cats <= 1:
warnings.warn(
"Insufficient number of categories. "
"Chi-squared test cannot be performed.",
Expand Down
27 changes: 15 additions & 12 deletions dataprofiler/profilers/unstructured_labeler_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from collections import defaultdict

from pandas import Series
import polars as pl

from ..labelers.base_data_labeler import BaseDataLabeler
from ..labelers.data_labelers import DataLabeler
Expand Down Expand Up @@ -155,25 +155,27 @@ def label_encoding(self) -> list[str]:
return self.data_labeler.labels

@BaseColumnProfiler._timeit(name="data_labeler_predict")
def _update_helper(self, df_series_clean: Series, profile: dict) -> None:
def _update_helper(self, df_series_clean: pl.Series, profile: dict) -> None:
"""
Update col profile properties with clean dataset and its known profile.
:param df_series_clean: df series with nulls removed
:type df_series_clean: pandas.core.series.Series
:type df_series_clean: polars.Series
:param profile: profile dictionary
:type profile: dict
:return: None
"""
data_ndarray = df_series_clean.to_numpy()

# this will get char_level predictions as output
predictions = self.data_labeler.predict(df_series_clean)
predictions = self.data_labeler.predict(data_ndarray)

# also store spacy/NER format
postprocessor = CharPostprocessor(
use_word_level_argmax=True, output_format="NER"
)
format_predictions = postprocessor.process(
df_series_clean, predictions.copy(), self.data_labeler.label_mapping
data_ndarray, predictions.copy(), self.data_labeler.label_mapping
)

# Update counts and percent values
Expand All @@ -188,14 +190,15 @@ def _update_helper(self, df_series_clean: Series, profile: dict) -> None:
# CHARACTERS/WORDS PROCESSED
self._update_column_base_properties(profile)

def update(self, df_series: Series) -> None:
def update(self, df_series: pl.Series) -> None:
"""Update profile."""
if len(df_series) == 0:
return
profile = dict(
char_sample_size=self.char_sample_size,
word_sample_size=self.word_sample_size,
)

self._update_helper(df_series, profile)

@property
Expand Down Expand Up @@ -278,21 +281,21 @@ def _update_true_char_label_counts(self, predictions: list) -> None:
self.char_sample_size += len(sample)

def _update_postprocess_char_label_counts(
self, df_series_clean: Series, format_predictions: dict
self, df_series_clean: pl.Series, format_predictions: dict
) -> None:
"""
Update the postprocess character label counts.
:param df_series_clean: df series with nulls removed
:type df_series_clean: pandas.core.series.Series
:type df_series_clean: polars.Series
:param format_predictions: contains dict of samples with predictions on
the character level in congruence with the word level predictions
:type format_predictions: Dict
:return: None
"""
char_label_counts = self.entity_counts["postprocess_char_level"]

for index, result in enumerate(zip(df_series_clean, format_predictions)):
for result in zip(df_series_clean, format_predictions):
text, entities = result
index = 0
for entity in entities:
Expand All @@ -308,20 +311,20 @@ def _update_postprocess_char_label_counts(
char_label_counts["UNKNOWN"] += len(text) - index

def _update_word_label_counts(
self, df_series_clean: Series, format_predictions: dict
self, df_series_clean: pl.Series, format_predictions: dict
) -> None:
"""
Update the sorted dictionary of each entity count.
:param df_series_clean: df series with nulls removed
:type df_series_clean: pandas.core.series.Series
:type df_series_clean: polars.Series
:param format_predictions: Dictionary of sample text and entities
:type format_predictions: dict
:return: None
"""
word_label_counts = self.entity_counts["word_level"]

for index, result in enumerate(zip(df_series_clean, format_predictions)):
for result in zip(df_series_clean, format_predictions):
text, entities = result
begin_word_idx = -1
index = 0
Expand Down
53 changes: 38 additions & 15 deletions dataprofiler/profilers/unstructured_text_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import warnings
from collections import Counter, defaultdict

import polars as pl
from numpy import ndarray
from pandas import DataFrame, Series

Expand Down Expand Up @@ -667,15 +668,15 @@ def profile(self) -> dict:
@BaseColumnProfiler._timeit(name="vocab")
def _update_vocab(
self,
data: list | ndarray | DataFrame,
data: list | ndarray | DataFrame | pl.DataFrame,
prev_dependent_properties: dict = None,
subset_properties: dict = None,
) -> None:
"""
Find the vocabulary counts used in the text samples.
:param data: list or array of data from which to extract vocab
:type data: Union[list, numpy.array, pandas.DataFrame]
:type data: Union[list, numpy.array, pandas.DataFrame, polars.DataFrame]
:param prev_dependent_properties: Contains all the previous properties
that the calculations depend on.
:type prev_dependent_properties: dict
Expand All @@ -690,15 +691,15 @@ def _update_vocab(
@BaseColumnProfiler._timeit(name="words")
def _update_words(
self,
data: list | ndarray | DataFrame,
data: list | ndarray | DataFrame | pl.DataFrame,
prev_dependent_properties: dict = None,
subset_properties: dict = None,
) -> None:
"""
Find unique words and word count used in the text samples.
:param data: list or array of data from which to extract vocab
:type data: Union[list, numpy.array, pandas.DataFrame]
:type data: Union[list, numpy.array, pandas.DataFrame, polars.DataFrame]
:param prev_dependent_properties: Contains all the previous properties
that the calculations depend on.
:type prev_dependent_properties: dict
Expand All @@ -708,37 +709,54 @@ def _update_words(
:return: None
"""
if not self._is_case_sensitive:
words = (
[w.strip(string.punctuation) for w in row.lower().split()]
for row in data
)
if isinstance(data, pl.DataFrame):
words = (
[
w.strip(string.punctuation)
for w in row.str.to_lowercase().str.split(by=" ")
]
for row in data
)
else:
words = (
[w.strip(string.punctuation) for w in row.lower().split()]
for row in data
)
else:
words = ([w.strip(string.punctuation) for w in row.split()] for row in data)
if isinstance(data, pl.DataFrame):
words = (
[w.strip(string.punctuation) for w in row.str.split(by=" ")]
for row in data
)
else:
words = (
[w.strip(string.punctuation) for w in row.split()] for row in data
)
word_count = Counter(itertools.chain.from_iterable(words))

for w, c in word_count.items():
if w and w.lower() not in self._stop_words:
self.word_count.update({w: c})

def _update_helper(self, data: Series, profile: dict) -> None:
def _update_helper(self, data: pl.Series, profile: dict) -> None:
"""
Update col profile properties with clean dataset and its known null parameters.
:param data: df series with nulls removed
:type data: pandas.core.series.Series
:type data: polars.Series
:param profile: text profile dictionary
:type profile: dict
:return: None
"""
self.sample_size += profile.pop("sample_size")
self.metadata = profile

def update(self, data: Series) -> TextProfiler:
def update(self, data: Series | pl.Series) -> TextProfiler:
"""
Update the column profile.
:param data: df series
:type data: pandas.core.series.Series
:type data: polars.Series
:return: updated TextProfiler
:rtype: TextProfiler
"""
Expand All @@ -748,14 +766,19 @@ def update(self, data: Series) -> TextProfiler:

profile = dict(sample_size=len_data)

if isinstance(data, pl.Series):
data_pandas = data.to_pandas()
else:
data_pandas = data

BaseColumnProfiler._perform_property_calcs(
self, # type: ignore
self.__calculations,
df_series=data,
df_series=data_pandas,
prev_dependent_properties={},
subset_properties=profile,
)

self._update_helper(data, profile)
self._update_helper(pl.Series(data), profile)

return self
Loading

0 comments on commit 503efa2

Please sign in to comment.