From 300ca0903aa05a1d2ca845cce5c71fe3292c0cc6 Mon Sep 17 00:00:00 2001 From: Andrew Li Date: Wed, 24 Apr 2024 15:10:23 -0500 Subject: [PATCH] partial update to profile builder --- .../profilers/base_column_profilers.py | 8 +- dataprofiler/profilers/profile_builder.py | 391 ++++++++------- dataprofiler/profilers/profiler_utils.py | 2 +- .../profilers/test_base_column_profilers.py | 4 +- .../tests/profilers/test_profile_builder.py | 457 ++++++++---------- 5 files changed, 398 insertions(+), 464 deletions(-) diff --git a/dataprofiler/profilers/base_column_profilers.py b/dataprofiler/profilers/base_column_profilers.py index 1ef5b75fe..1b6a221fa 100644 --- a/dataprofiler/profilers/base_column_profilers.py +++ b/dataprofiler/profilers/base_column_profilers.py @@ -9,7 +9,7 @@ from typing import Any, Callable, Generic, TypeVar import numpy as np -import pandas as pd +import polars as pl from . import profiler_utils from .profiler_options import BaseInspectorOptions, BaseOption @@ -97,7 +97,7 @@ def _filter_properties_w_options( def _perform_property_calcs( self, calculations: dict, - df_series: pd.DataFrame, + df_series: pl.Series, prev_dependent_properties: dict, subset_properties: dict, ) -> None: @@ -217,12 +217,12 @@ def __getitem__(self, item: str) -> Any: return getattr(self, item) @abc.abstractmethod - def _update_helper(self, df_series_clean: pd.DataFrame, profile: dict) -> None: + def _update_helper(self, df_series_clean: pl.Series, profile: dict) -> None: """Help update the profile.""" raise NotImplementedError() @abc.abstractmethod - def update(self, df_series: pd.DataFrame) -> BaseColumnProfiler: + def update(self, df_series: pl.Series) -> BaseColumnProfiler: """ Update the profile. diff --git a/dataprofiler/profilers/profile_builder.py b/dataprofiler/profilers/profile_builder.py index 6e512658f..f8ad3c847 100644 --- a/dataprofiler/profilers/profile_builder.py +++ b/dataprofiler/profilers/profile_builder.py @@ -16,7 +16,7 @@ import networkx as nx import numpy as np -import pandas as pd +import polars as pl from HLL import HyperLogLog from .. import data_readers, dp_logging, rng_utils @@ -52,12 +52,12 @@ logger = dp_logging.get_child_logger(__name__) -class StructuredColProfiler: +class StructuredColProfiler: # pragma: no cover """For profiling structured data columns.""" def __init__( self, - df_series: pd.Series = None, + df_series: pl.Series = None, sample_size: int = None, min_sample_size: int = 5000, sampling_ratio: float = 0.2, @@ -142,7 +142,7 @@ def __init__( self._update_base_stats(base_stats) def update_column_profilers( - self, clean_sampled_df: pd.Series, pool: Pool = None + self, clean_sampled_df: pl.Series, pool: Pool = None ) -> None: """ Calculate type statistics and label dataset. @@ -217,7 +217,7 @@ def __add__(self, other: StructuredColProfiler) -> StructuredColProfiler: "profiles and cannot be added together." ) merged_profile = StructuredColProfiler( - df_series=pd.Series([]), + df_series=pl.Series([]), min_sample_size=max(self._min_sample_size, other._min_sample_size), sampling_ratio=max(self._sampling_ratio, other._sampling_ratio), min_true_samples=max(self._min_true_samples, other._min_true_samples), @@ -434,7 +434,7 @@ def _update_base_stats(self, base_stats: dict) -> None: self.null_count += base_stats["null_count"] self.null_ratio = base_stats["null_count"] / base_stats["sample_size"] self.null_types = profiler_utils._combine_unique_sets( - self.null_types, list(base_stats["null_types"].keys()) + self.null_types, list(base_stats["null_types"]) ) base_min: int = base_stats["min_id"] @@ -470,14 +470,12 @@ def _update_base_stats(self, base_stats: dict) -> None: self._max_id = max(self._max_id, base_max) # Update null row indices - for null_type, null_rows in base_nti.items(): - if type(null_rows) is list: - null_rows.sort() - self.null_types_index.setdefault(null_type, set()).update(null_rows) + for null_type in base_nti: + self.null_types_index.setdefault(null_type, set()) def update_profile( self, - df_series: pd.Series, + df_series: pl.Series, sample_size: int = None, min_true_samples: int = None, sample_ids: np.ndarray = None, @@ -487,7 +485,7 @@ def update_profile( Update the column profiler. :param df_series: Data to be profiled - :type df_series: pandas.core.series.Series + :type df_series: polars.Series :param sample_size: Number of samples to use in generating profile :type sample_size: int :param min_true_samples: Minimum number of samples required for the @@ -516,12 +514,12 @@ def update_profile( self._update_base_stats(base_stats) self.update_column_profilers(clean_sampled_df, pool) - def _get_sample_size(self, df_series: pd.Series) -> int: + def _get_sample_size(self, df_series: pl.Series) -> int: """ Determine the minimum sampling size for detecting column type. :param df_series: a column of data - :type df_series: pandas.core.series.Series + :type df_series: polars.Series :return: integer sampling size :rtype: int """ @@ -534,12 +532,12 @@ def _get_sample_size(self, df_series: pd.Series) -> int: # index number in the error as well @staticmethod def clean_data_and_get_base_stats( - df_series: pd.Series, + df_series: pl.Series, sample_size: int, null_values: dict[str, re.RegexFlag | int] = None, min_true_samples: int = None, sample_ids: np.ndarray | list[list[int]] | None = None, - ) -> tuple[pd.Series, dict]: + ) -> tuple[pl.Series, dict]: """ Identify null characters and return them in a dictionary. @@ -560,7 +558,7 @@ def clean_data_and_get_base_stats( :type sample_ids: list(list) :return: updated column with null removed and dictionary of null parameters - :rtype: pd.Series, dict + :rtype: pl.Series, dict """ if min_true_samples is None: min_true_samples = 0 @@ -584,25 +582,12 @@ def clean_data_and_get_base_stats( ) # Pandas reads empty values in the csv files as nan - df_series = df_series.apply(str) - - # Record min and max index values if index is int - is_index_all_ints = True - try: - min_id = min(df_series.index) - max_id = max(df_series.index) - if not (isinstance(min_id, int) and isinstance(max_id, int)): - is_index_all_ints = False - except TypeError: - is_index_all_ints = False - - if not is_index_all_ints: - min_id = max_id = None - warnings.warn( - "Unable to detect minimum and maximum index values " - "for overlap detection. Updating/merging profiles " - "may result in inaccurate null row index reporting " - "due to unhandled overlapping indices." + with warnings.catch_warnings(): + warnings.filterwarnings( + "ignore", category=pl.exceptions.PolarsInefficientMapWarning + ) + df_series = df_series.map_elements( + str, skip_nulls=False, return_dtype=pl.String ) # Select generator depending if sample_ids availability @@ -615,62 +600,66 @@ def clean_data_and_get_base_stats( sample_ids[0], chunk_size=sample_size ) - na_columns: dict = dict() - true_sample_set = set() + null_types = set() + num_samples = min(len(df_series), sample_size) + if min_true_samples > 0 or sample_ids is None: + true_sample_mask = pl.Series([False] * num_samples) + else: + true_sample_mask = pl.Series([], dtype=pl.Boolean) total_sample_size = 0 query = "|".join(null_values.keys()) - regex = f"^(?:{(query)})$" + regex = f"^(?i)(?:{(query)})$" for chunked_sample_ids in sample_ind_generator: total_sample_size += len(chunked_sample_ids) # Find subset of series based on randomly selected ids - df_subset = df_series.iloc[chunked_sample_ids] + df_subset = df_series[chunked_sample_ids] # Query should search entire cell for all elements at once - matches = df_subset.str.match(regex, flags=re.IGNORECASE) - - # Split series into None samples and true samples - true_sample_set.update(df_subset[~matches].index) - + matches = df_subset.str.contains(regex) + # Split series into None samples and true samples using a mask + if min_true_samples > 0 or sample_ids is None: + for new_idx, old_idx in enumerate(chunked_sample_ids): + true_sample_mask[old_idx] = not matches[new_idx] + else: + true_sample_mask.append(matches.not_()) # Iterate over all the Nones - for index, cell in df_subset[matches].items(): - na_columns.setdefault(cell, list()).append(index) - + for cell in df_subset.filter(matches): + null_types.add(cell) # Ensure minimum number of true samples met # and if total_sample_size >= sample size, exit if ( - len(true_sample_set) >= min_true_samples + sum(true_sample_mask) >= min_true_samples and total_sample_size >= sample_size ): break - # close the generator in case it is not exhausted. if sample_ids is None: sample_ind_generator.close() - # If min_true_samples exists, sort - true_sample_list = ( - sorted(true_sample_set) - if min_true_samples > 0 or sample_ids is None - else list(true_sample_set) - ) + if num_samples < len(df_series): + true_sample_mask.append(pl.Series([False] * (len(df_series) - num_samples))) # Split out true values for later utilization - df_series = df_series.loc[true_sample_list] - total_na = total_sample_size - len(true_sample_list) + df_series = df_series.filter(true_sample_mask) + total_na = total_sample_size - sum(true_sample_mask) rng = rng_utils.get_random_number_generator() + warnings.warn( + "Base stats about indexes (min_id, max_id, " + "and null_types_index) have been depreciated" + ) base_stats = { "sample_size": total_sample_size, "null_count": total_na, "null_ratio": total_na / total_sample_size, - "null_types": na_columns, + "null_types": null_types, "sample": rng.choice( - list(df_series.values), (min(len(df_series), 5),), replace=False + list(df_series), (min(len(df_series), 5),), replace=False ).tolist(), - "min_id": min_id, - "max_id": max_id, + "min_id": "depreciated", + "max_id": "depreciated", } return df_series, base_stats @@ -840,12 +829,14 @@ def diff(self, other_profile: BaseProfiler, options: dict = None) -> dict: return diff_profile - def _get_sample_size(self, data: pd.Series | pd.DataFrame | list) -> int: + def _get_sample_size( + self, data: pl.Series | pl.DataFrame | data_readers.base_data.BaseData + ) -> int: """ Determine the minimum sampling size for profiling the dataset. :param data: a dataset - :type data: Union[pd.Series, pd.DataFrame, list] + :type data: Union[pl.Series, pl.DataFrame, list] :return: integer sampling size :rtype: int """ @@ -919,7 +910,7 @@ def load_from_dict( def _update_profile_from_chunk( self, - data: pd.Series | pd.DataFrame | list, + data: pl.Series | pl.DataFrame | list, sample_size: int, min_true_samples: int = None, ) -> None: @@ -927,7 +918,7 @@ def _update_profile_from_chunk( Iterate over the dataset and identify its parameters via profiles. :param data: dataset to be profiled - :type data: Union[pd.Series, pd.DataFrame, list] + :type data: Union[pl.Series, pl.DataFrame, list] :param sample_size: number of samples for df to use for profiling :type sample_size: int :param min_true_samples: minimum number of true samples required @@ -938,7 +929,7 @@ def _update_profile_from_chunk( def update_profile( self, - data: data_readers.base_data.BaseData | pd.DataFrame | pd.Series, + data: data_readers.base_data.BaseData | pl.DataFrame | pl.Series, sample_size: int = None, min_true_samples: int = None, ) -> None: @@ -988,7 +979,9 @@ def update_profile( if not sample_size: sample_size = self._get_sample_size(data) - self._update_profile_from_chunk(data, sample_size, min_true_samples) + self._update_profile_from_chunk( + data, sample_size, min_true_samples # type: ignore[arg-type] + ) # set file properties since data will be processed if encoding is not None: @@ -1237,7 +1230,7 @@ class UnstructuredProfiler(BaseProfiler): _default_labeler_type = "unstructured" _option_class = UnstructuredOptions - _allowed_external_data_types = (str, list, pd.Series, pd.DataFrame) + _allowed_external_data_types = (str, list, pl.Series, pl.DataFrame) def __init__( self, @@ -1278,7 +1271,7 @@ def __init__( self.sample: list[str] = [] if data is not None: - self.update_profile(data) + self.update_profile(data) # type: ignore[arg-type] def _add_error_checks( # type: ignore[override] self, other: UnstructuredProfiler @@ -1447,8 +1440,8 @@ def load_from_dict( @profiler_utils.method_timeit(name="clean_and_base_stats") def _clean_data_and_get_base_stats( - self, data: pd.Series, sample_size: int, min_true_samples: int = None - ) -> tuple[pd.Series, dict]: + self, data: pl.Series, sample_size: int, min_true_samples: int = None + ) -> tuple[pl.Series, dict]: """ Identify empty rows and return clean version of text data without empty rows. @@ -1461,7 +1454,7 @@ def _clean_data_and_get_base_stats( :type min_true_samples: int :return: updated column with null removed and dictionary of null parameters - :rtype: pd.Series, dict + :rtype: pl.Series, dict """ if min_true_samples is None: min_true_samples = 0 @@ -1479,7 +1472,7 @@ def _clean_data_and_get_base_stats( ) # ensure all data are of type str - data = data.apply(str) + data = data.cast(str) # get memory size base_stats: dict = { @@ -1491,26 +1484,27 @@ def _clean_data_and_get_base_stats( len_data, chunk_size=sample_size ) - true_sample_list = set() + true_sample_mask = pl.Series([False] * len(data)) total_sample_size = 0 - regex = r"^\s*$" + regex = r"^(?i)\s*$" for chunked_sample_ids in sample_ind_generator: total_sample_size += len(chunked_sample_ids) # Find subset of series based on randomly selected ids - data_subset = data.iloc[chunked_sample_ids] + data_subset = data[chunked_sample_ids] # Query should search entire cell for all elements at once - matches = data_subset.str.match(regex, flags=re.IGNORECASE) + matches = data_subset.str.contains(regex) # Split series into None samples and true samples - true_sample_list.update(data_subset[~matches].index) + for new_idx, old_idx in enumerate(chunked_sample_ids): + true_sample_mask[old_idx] = not matches[new_idx] # Ensure minimum number of true samples met # and if total_sample_size >= sample size, exit if ( - len(true_sample_list) >= min_true_samples + sum(true_sample_mask) >= min_true_samples and total_sample_size >= sample_size ): break @@ -1518,17 +1512,15 @@ def _clean_data_and_get_base_stats( # close the generator in case it is not exhausted. sample_ind_generator.close() - true_sample_list = sorted(true_sample_list) # type: ignore[assignment] - # Split out true values for later utilization - data = data.loc[true_sample_list] - total_empty = total_sample_size - len(true_sample_list) + data = data.filter(true_sample_mask) + total_empty = total_sample_size - sum(true_sample_mask) base_stats.update( { "sample_size": total_sample_size, "empty_line_count": total_empty, - "sample": random.sample(list(data.values), min(len(data), 5)), + "sample": random.sample(list(data), min(len(data), 5)), } ) @@ -1536,7 +1528,7 @@ def _clean_data_and_get_base_stats( def _update_profile_from_chunk( self, - data: pd.Series | pd.DataFrame | list, + data: pl.Series | pl.DataFrame | list, sample_size: int, min_true_samples: int = None, ) -> None: @@ -1544,14 +1536,14 @@ def _update_profile_from_chunk( Iterate over the dataset and identify its parameters via profiles. :param data: a text dataset - :type data: Union[pd.Series, pd.DataFrame, list] + :type data: Union[pl.Series, pl.DataFrame, list] :param sample_size: number of samples for df to use for profiling :type sample_size: int :param min_true_samples: minimum number of true samples required :type min_true_samples: int :return: None """ - if isinstance(data, pd.DataFrame): + if isinstance(data, pl.DataFrame): if len(data.columns) > 1: raise ValueError( "The unstructured cannot handle a dataset " @@ -1560,11 +1552,13 @@ def _update_profile_from_chunk( "appropriate." ) data = data[data.columns[0]] - elif isinstance(data, (str, list)): + elif isinstance(data, str): # we know that if it comes in as a list, it is a 1-d list based # bc of our data readers # for strings, we just need to put it inside a series for compute. - data = pd.Series(data) + data = pl.Series([data]) + elif isinstance(data, list): + data = pl.Series(data) # Format the data notification_str = "Finding the empty lines in the data..." @@ -1622,12 +1616,12 @@ def save(self, filepath: str = None, save_method: str = "pickle") -> None: raise ValueError('save_method must be "json" or "pickle".') -class StructuredProfiler(BaseProfiler): +class StructuredProfiler(BaseProfiler): # pragma: no cover """For profiling structured data.""" _default_labeler_type = "structured" _option_class = StructuredOptions - _allowed_external_data_types = (list, pd.Series, pd.DataFrame) + _allowed_external_data_types = (list, pl.Series, pl.DataFrame) def __init__( self, @@ -1684,7 +1678,7 @@ def __init__( sparse=False, ) if data is not None: - self.update_profile(data) + self.update_profile(data) # type: ignore[arg-type] def _add_error_checks( # type: ignore[override] self, other: StructuredProfiler @@ -2200,7 +2194,7 @@ def _get_duplicate_row_count(self) -> int | None: @profiler_utils.method_timeit(name="row_stats") def _update_row_statistics( - self, data: pd.DataFrame, sample_ids: list[int] = None + self, data: pl.DataFrame, sample_ids: list[int] = None ) -> None: """ Iterate over the provided dataset row by row and calculate row stats. @@ -2214,26 +2208,14 @@ def _update_row_statistics( :param sample_ids: list of indices in order they were sampled in data :type sample_ids: list(int) """ - if not isinstance(data, pd.DataFrame): + if not isinstance(data, pl.DataFrame): raise ValueError( "Cannot calculate row statistics on data that is" "not a DataFrame" ) if self.options.row_statistics.unique_count.is_enabled: if isinstance(self.hashed_row_object, dict): - try: - self.hashed_row_object.update( - dict.fromkeys( - pd.util.hash_pandas_object(data, index=False), True - ) - ) - except TypeError: - self.hashed_row_object.update( - dict.fromkeys( - pd.util.hash_pandas_object(data.astype(str), index=False), - True, - ) - ) + self.hashed_row_object.update(dict.fromkeys(data.hash_rows(), True)) elif isinstance(self.hashed_row_object, HyperLogLog): batch_size = 2048 @@ -2242,63 +2224,65 @@ def _update_row_statistics( if start_ind >= len(data): break end_ind = (batch_ind + 1) * batch_size - for record in ( - data[start_ind : min(end_ind, len(data))] - .to_json(orient="records", lines=True) - .splitlines() - ): - self.hashed_row_object.add(record) + iter_data: pl.DataFrame = data[start_ind : min(end_ind, len(data))] + for record in iter_data.rows(named=True): + self.hashed_row_object.add(str(record)) + warnings.warn( + "row_has_null_count and row_is_null_count has " + "been depricated due to indexing being depreciated" + ) + # TODO Needs to be handled in some way # Calculate Null Column Count - if self.options.row_statistics.null_count.is_enabled: - null_rows = set() - null_in_row_count = set() - first_col_flag = True - for column in self._profile: - null_type_dict = column.null_types_index - null_row_indices = set() - if null_type_dict: - null_row_indices = set.union(*null_type_dict.values()) - - # If sample ids provided, only consider nulls in rows that - # were fully sampled - if sample_ids is not None: - # This is the amount (integer) indices were shifted by in the - # event of overlap - shift = column._index_shift - if shift is None: - # Shift is None if index is str or if no overlap detected - null_row_indices = null_row_indices.intersection( - data.index[sample_ids[: self._min_sampled_from_batch]] - ) - else: - # Only shift if index shift detected (must be ints) - null_row_indices = null_row_indices.intersection( - data.index[sample_ids[: self._min_sampled_from_batch]] - + shift - ) - - # Find the common null indices between the columns - if first_col_flag: - null_rows = null_row_indices - null_in_row_count = null_row_indices - first_col_flag = False - else: - null_rows = null_rows.intersection(null_row_indices) - null_in_row_count = null_in_row_count.union(null_row_indices) - - # If sample_ids provided, - # increment since that means only new data read - if sample_ids is not None: - self.row_has_null_count += len(null_in_row_count) - self.row_is_null_count += len(null_rows) - else: - self.row_has_null_count = len(null_in_row_count) - self.row_is_null_count = len(null_rows) + # if self.options.row_statistics.null_count.is_enabled: + # null_rows = set() + # null_in_row_count = set() + # first_col_flag = True + # for column in self._profile: + # null_type_dict = column.null_types_index + # null_row_indices = set() + # if null_type_dict: + # null_row_indices = set.union(*null_type_dict.values()) + + # # If sample ids provided, only consider nulls in rows that + # # were fully sampled + # if sample_ids is not None: + # # This is the amount (integer) indices were shifted by in the + # # event of overlap + # shift = column._index_shift + # if shift is None: + # # Shift is None if index is str or if no overlap detected + # null_row_indices = null_row_indices.intersection( + # data.index[sample_ids[: self._min_sampled_from_batch]] + # ) + # else: + # # Only shift if index shift detected (must be ints) + # null_row_indices = null_row_indices.intersection( + # data.index[sample_ids[: self._min_sampled_from_batch]] + # + shift + # ) + + # # Find the common null indices between the columns + # if first_col_flag: + # null_rows = null_row_indices + # null_in_row_count = null_row_indices + # first_col_flag = False + # else: + # null_rows = null_rows.intersection(null_row_indices) + # null_in_row_count = null_in_row_count.union(null_row_indices) + + # # If sample_ids provided, + # # increment since that means only new data read + # if sample_ids is not None: + # self.row_has_null_count += len(null_in_row_count) + # self.row_is_null_count += len(null_rows) + # else: + # self.row_has_null_count = len(null_in_row_count) + # self.row_is_null_count = len(null_rows) def _get_correlation( self, clean_samples: dict, batch_properties: dict - ) -> pd.DataFrame: + ) -> pl.DataFrame: """ Calculate correlation matrix on the cleaned data. @@ -2308,7 +2292,7 @@ def _get_correlation( for correlation computation :type batch_properties: dict() :return: correlation matrix - :rtype: pd.DataFrame + :rtype: pl.DataFrame """ columns = self.options.correlation.columns column_ids = list(range(len(self._profile))) @@ -2326,9 +2310,8 @@ def _get_correlation( clean_samples.pop(idx) else: clean_column_ids.append(idx) - data = pd.DataFrame(clean_samples).apply(pd.to_numeric, errors="coerce") - means = {index: mean for index, mean in enumerate(batch_properties["mean"])} - data = data.fillna(value=means) + data = pl.DataFrame(clean_samples).cast(pl.Float32) + data = data.fill_null(strategy="mean") data = data[clean_column_ids] # Update the counts/std if needed (i.e. if null rows or exist) @@ -2350,7 +2333,7 @@ def _get_correlation( rows = [[id] for id in clean_column_ids] corr_mat[rows, clean_column_ids] = np.corrcoef(data, rowvar=False) - return corr_mat + return pl.DataFrame(corr_mat) @profiler_utils.method_timeit(name="correlation") def _update_correlation( @@ -2366,7 +2349,7 @@ def _update_correlation( batch_corr = self._get_correlation(clean_samples, batch_properties) self.correlation_matrix = self._merge_correlation_helper( - self.correlation_matrix, + pl.DataFrame(self.correlation_matrix), prev_dependent_properties["mean"], prev_dependent_properties["std"], self.total_samples - self.row_is_null_count, @@ -2377,7 +2360,7 @@ def _update_correlation( ) @profiler_utils.method_timeit(name="correlation") - def _merge_correlation(self, other: StructuredProfiler) -> pd.DataFrame: + def _merge_correlation(self, other: StructuredProfiler) -> pl.DataFrame | None: """ Merge correlation matrix from two profiles. @@ -2389,9 +2372,9 @@ def _merge_correlation(self, other: StructuredProfiler) -> pd.DataFrame: n1 = self.total_samples - self.row_is_null_count n2 = other.total_samples - other.row_is_null_count if n1 == 0: - return corr_mat2 + return pl.DataFrame(corr_mat2) if n2 == 0: - return corr_mat1 + return pl.DataFrame(corr_mat1) if corr_mat1 is None or corr_mat2 is None: return None @@ -2434,8 +2417,17 @@ def _merge_correlation(self, other: StructuredProfiler) -> pd.DataFrame: if idx in col_ids2 ] ) - return self._merge_correlation_helper( - corr_mat1, mean1, std1, n1, corr_mat2, mean2, std2, n2 + return pl.DataFrame( + self._merge_correlation_helper( + pl.DataFrame(corr_mat1), + mean1, + std1, + n1, + pl.DataFrame(corr_mat2), + mean2, + std2, + n2, + ) ) def _get_correlation_dependent_properties(self, batch: dict = None) -> dict: @@ -2490,26 +2482,26 @@ def _get_correlation_dependent_properties(self, batch: dict = None) -> dict: @staticmethod def _merge_correlation_helper( - corr_mat1: pd.DataFrame, + corr_mat1: pl.DataFrame, mean1: np.ndarray, std1: np.ndarray, n1: int, - corr_mat2: pd.DataFrame, + corr_mat2: pl.DataFrame, mean2: np.ndarray, std2: np.ndarray, n2: int, - ) -> pd.DataFrame: + ) -> np.ndarray: """ Help merge correlation matrix from two profiles. :param corr_mat1: correlation matrix of profile1 - :type corr_mat1: pd.DataFrame + :type corr_mat1: np.ndarray :param mean1: mean of columns of profile1 :type mean1: np.array :param std1: standard deviation of columns of profile1 :type std1: np.array :param corr_mat2: correlation matrix of profile2 - :type corr_mat2: pd.DataFrame + :type corr_mat2: pl.DataFrame :param mean2: mean of columns of profile2 :type mean2: np.array :param std2: standard deviation of columns of profile2 @@ -2517,13 +2509,13 @@ def _merge_correlation_helper( :return: merged correlation matrix """ if corr_mat1 is None: - return corr_mat2 + return corr_mat2.to_numpy() elif corr_mat2 is None: - return corr_mat1 + return corr_mat1.to_numpy() elif len(mean1) == 0: - return corr_mat2 + return corr_mat2.to_numpy() elif len(mean2) == 0: - return corr_mat1 + return corr_mat1.to_numpy() std_mat1 = np.outer(std1, std1) std_mat2 = np.outer(std2, std2) @@ -2545,7 +2537,7 @@ def _merge_correlation_helper( std = np.sqrt(M2 / (n - 1)) std_mat = np.outer(std, std) - corr_mat = cov / std_mat + corr_mat: np.ndarray = cov / std_mat return corr_mat @@ -2599,7 +2591,7 @@ def _update_null_replication_metrics(self, clean_samples: dict) -> None: :param clean_samples: input cleaned dataset :type clean_samples: dict """ - data = pd.DataFrame(clean_samples).apply(pd.to_numeric, errors="coerce") + data = pl.DataFrame(clean_samples).cast(pl.Float64) get_data_type = lambda profile: profile.profiles[ # NOQA: E731 "data_type_profile" @@ -2646,11 +2638,12 @@ def _update_null_replication_metrics(self, clean_samples: dict) -> None: # Partition data based on whether target column value is null or not # Calculate sum, mean of each partition without including current column # in calculation - sum_null = ( - data.loc[data.index.intersection(null_indices), data.columns != col_id] - .sum() - .to_numpy() - ) + # TODO Need to somehow handle lack of null_indices + # df: pl.DataFrame = data[ + # data.index.intersection(null_indices), data.columns != col_id + # ] + df: pl.DataFrame = data.sum() + sum_null = df.to_numpy() # Add old sum_null if exists if col_id in self._null_replication_metrics: @@ -2788,7 +2781,7 @@ def _merge_null_replication_metrics(self, other: StructuredProfiler) -> dict: def _update_profile_from_chunk( self, - data: list | pd.Series | pd.DataFrame, + data: list | pl.Series | pl.DataFrame, sample_size: int, min_true_samples: int = None, ) -> None: @@ -2803,13 +2796,13 @@ def _update_profile_from_chunk( :type min_true_samples: int :return: None """ - if isinstance(data, pd.Series): + if isinstance(data, pl.Series): data = data.to_frame() elif isinstance(data, list): - data = pd.DataFrame(data, dtype=object) + data = pl.DataFrame(data) # Calculate schema of incoming data - mapping_given = defaultdict(list) + mapping_given: dict[str | int, list[int]] = defaultdict(list) for col_idx in range(len(data.columns)): col = data.columns[col_idx] # Pandas columns are int by default, but need to fuzzy match strs @@ -2882,8 +2875,10 @@ def tqdm(level: set[int]) -> Generator[int, None, None]: # Generate pool and estimate datasize pool = None if auto_multiprocess_toggle: - est_data_size = data[:50000].memory_usage(index=False, deep=True).sum() - est_data_size = (est_data_size / min(50000, len(data))) * len(data) + df: pl.DataFrame = data[:50000] + est_data_size = int( + (df.estimated_size() / min(50000, len(data))) * len(data) + ) pool, pool_size = profiler_utils.generate_pool( max_pool_size=None, data_size=est_data_size, cols=len(data.columns) ) @@ -2909,7 +2904,7 @@ def tqdm(level: set[int]) -> Generator[int, None, None]: if pool is not None: # Create a bunch of simultaneous column conversions for col_idx in range(data.shape[1]): - col_ser = data.iloc[:, col_idx] + col_ser = data[:, col_idx] prof_idx = col_idx_to_prof_idx[col_idx] if min_true_samples is None: min_true_samples = self._profile[prof_idx]._min_true_samples @@ -2955,7 +2950,7 @@ def tqdm(level: set[int]) -> Generator[int, None, None]: "errors, reprocessing...", ) for col_idx in tqdm(single_process_list): - col_ser = data.iloc[:, col_idx] + col_ser = data[:, col_idx] prof_idx = col_idx_to_prof_idx[col_idx] if min_true_samples is None: min_true_samples = self._profile[prof_idx]._min_true_samples @@ -2979,7 +2974,7 @@ def tqdm(level: set[int]) -> Generator[int, None, None]: else: # No pool logger.info(notification_str) for col_idx in tqdm(range(data.shape[1])): - col_ser = data.iloc[:, col_idx] + col_ser = data[:, col_idx] prof_idx = col_idx_to_prof_idx[col_idx] if min_true_samples is None: min_true_samples = self._profile[prof_idx]._min_true_samples @@ -3118,10 +3113,10 @@ def __new__( # type: ignore elif isinstance(data, str): profiler_type = "unstructured" # the below checks the viable structured formats, on failure raises - elif not isinstance(data, (list, nx.Graph, pd.DataFrame, pd.Series)): + elif not isinstance(data, (list, nx.Graph, pl.DataFrame, pl.Series)): raise ValueError( "Data must either be imported using the " - "data_readers, nx.Graph, pd.Series, or pd.DataFrame." + "data_readers, nx.Graph, pl.Series, or pl.DataFrame." ) # Construct based off of initial kwarg input or inference diff --git a/dataprofiler/profilers/profiler_utils.py b/dataprofiler/profilers/profiler_utils.py index 66f33c773..d3e614ab4 100644 --- a/dataprofiler/profilers/profiler_utils.py +++ b/dataprofiler/profilers/profiler_utils.py @@ -134,7 +134,7 @@ def shuffle_in_chunks( indices[j], indices[k] = indices[k], indices[j] # set the swapped value to the output - values[count] = indices[j] + values[count] = int(indices[j]) # increment so as not to include values already swapped j += 1 diff --git a/dataprofiler/tests/profilers/test_base_column_profilers.py b/dataprofiler/tests/profilers/test_base_column_profilers.py index 4ab7182cf..7dbd570fd 100644 --- a/dataprofiler/tests/profilers/test_base_column_profilers.py +++ b/dataprofiler/tests/profilers/test_base_column_profilers.py @@ -5,7 +5,7 @@ from unittest.mock import MagicMock, patch import numpy as np -import pandas as pd +import polars as pl from dataprofiler.profilers import profiler_utils from dataprofiler.profilers.base_column_profilers import ( @@ -261,7 +261,7 @@ def setUpClass(cls): cls.input_file_path = os.path.join( test_root_path, "data", "csv/aws_honeypot_marx_geo.csv" ) - cls.aws_dataset = next(pd.read_csv(cls.input_file_path, chunksize=100)) + cls.aws_dataset = next(pl.read_csv(cls.input_file_path, batch_size=100)) dataset = cls.aws_dataset["datetime"].dropna() cls.column_profile = cls.column_profiler(dataset) cls.profilers = cls.column_profile._profilers diff --git a/dataprofiler/tests/profilers/test_profile_builder.py b/dataprofiler/tests/profilers/test_profile_builder.py index c4e604737..0b1568adc 100644 --- a/dataprofiler/tests/profilers/test_profile_builder.py +++ b/dataprofiler/tests/profilers/test_profile_builder.py @@ -10,6 +10,7 @@ import networkx as nx import numpy as np import pandas as pd +import polars as pl import dataprofiler import dataprofiler as dp @@ -71,7 +72,7 @@ def setUpClass(cls): cls.input_file_path = os.path.join( test_root_path, "data", "csv/aws_honeypot_marx_geo.csv" ) - cls.aws_dataset = pd.read_csv(cls.input_file_path) + cls.aws_dataset = pl.read_csv(cls.input_file_path, infer_schema_length=0) profiler_options = ProfilerOptions() profiler_options.set( {"data_labeler.is_enabled": False, "multiprocess.is_enabled": False} @@ -99,8 +100,8 @@ def setUpClass(cls): def test_bad_input_data(self, *mocks): allowed_data_types = ( r"\(, " - r", " - r"\)" + r", " + r"\)" ) bad_data_types = [1, {}, np.inf, "sdfs"] for data in bad_data_types: @@ -155,13 +156,13 @@ def test_list_data(self, *mocks): "dataprofiler.profilers.profile_builder." "StructuredProfiler._update_correlation" ) - def test_pandas_series_data(self, *mocks): - data = pd.Series([1, None, 3, 4, 5, None, 1]) + def test_polars_series_data(self, *mocks): + data = pl.Series([1, None, 3, 4, 5, None, 1]) with test_utils.mock_timeit(): profiler = dp.StructuredProfiler(data) # test properties - self.assertEqual("", profiler.file_type) + self.assertEqual("", profiler.file_type) self.assertIsNone(profiler.encoding) self.assertEqual(2, profiler.row_has_null_count) self.assertEqual(2, profiler.row_is_null_count) @@ -172,9 +173,9 @@ def test_pandas_series_data(self, *mocks): self.assertDictEqual({"row_stats": 1}, profiler.times) # test properties when series has name - data.name = "test" + data.rename("test") profiler = dp.StructuredProfiler(data) - self.assertEqual("", profiler.file_type) + self.assertEqual("", profiler.file_type) self.assertIsNone(profiler.encoding) self.assertEqual(2, profiler.row_has_null_count) self.assertEqual(2, profiler.row_is_null_count) @@ -203,7 +204,7 @@ def test_pandas_series_data(self, *mocks): "dataprofiler.profilers.profile_builder." "StructuredProfiler._update_chi2" ) def test_add_profilers(self, *mocks): - data = pd.DataFrame([1, None, 3, 4, 5, None, 1]) + data = pl.DataFrame([1, None, 3, 4, 5, None, 1]) with test_utils.mock_timeit(): profile1 = dp.StructuredProfiler(data[:2]) profile2 = dp.StructuredProfiler(data[2:]) @@ -244,7 +245,7 @@ def test_add_profilers(self, *mocks): ) self.assertIsNone(merged_profile.encoding) self.assertEqual( - "", merged_profile.file_type + "", merged_profile.file_type ) self.assertTrue(merged_profile.options.row_statistics.null_count.is_enabled) self.assertTrue(merged_profile.options.row_statistics.unique_count.is_enabled) @@ -272,7 +273,7 @@ def test_add_profilers(self, *mocks): ) def test_stream_profilers(self, *mocks): mocks[0].return_value = None - data = pd.DataFrame( + data = pl.DataFrame( [ ["test1", 1.0], ["test2", None], @@ -300,7 +301,9 @@ def test_stream_profilers(self, *mocks): profiler.update_profile(data[3:]) self.assertIsNone(profiler.encoding) - self.assertEqual("", profiler.file_type) + self.assertEqual( + "", profiler.file_type + ) self.assertEqual(5, profiler.row_has_null_count) self.assertEqual(2, profiler.row_is_null_count) self.assertEqual(8, profiler.total_samples) @@ -335,7 +338,7 @@ def test_correlation(self, *mock): ) # data with a sole numeric column - data = pd.DataFrame([1.0, 8.0, 1.0, -2.0, 5.0]) + data = pl.DataFrame([1.0, 8.0, 1.0, -2.0, 5.0]) with test_utils.mock_timeit(): profiler = dp.StructuredProfiler(data, options=profile_options) expected_corr_mat = np.array([[1.0]]) @@ -343,13 +346,13 @@ def test_correlation(self, *mock): self.assertDictEqual({"row_stats": 1, "correlation": 1}, profiler.times) # data with one column with non-numeric calues - data = pd.DataFrame([1.0, None, 1.0, None, 5.0]) + data = pl.DataFrame([1.0, None, 1.0, None, 5.0]) profiler = dp.StructuredProfiler(data, options=profile_options) expected_corr_mat = np.array([[1]]) np.testing.assert_array_equal(expected_corr_mat, profiler.correlation_matrix) # data with two columns, but one is numerical - data = pd.DataFrame( + data = pl.DataFrame( [["test1", 1.0], ["test2", None], ["test1", 1.0], [None, None]] ) profiler = dp.StructuredProfiler(data, options=profile_options) @@ -358,7 +361,7 @@ def test_correlation(self, *mock): np.testing.assert_array_equal(expected_corr_mat, profiler.correlation_matrix) # data with multiple numerical columns - data = pd.DataFrame( + data = pl.DataFrame( { "a": [3, 2, 1, 7, 5, 9, 4, 10, 7, 2], "b": [10, 11, 1, 4, 2, 5, 6, 3, 9, 8], @@ -378,7 +381,7 @@ def test_correlation(self, *mock): ) # data with multiple numerical columns, with nan values - data = pd.DataFrame( + data = pl.DataFrame( { "a": [np.nan, np.nan, 1, 7, 5, 9, 4, 10, 7, 2], "b": [10, 11, np.nan, 4, 2, 5, 6, 3, 9, 8], @@ -399,7 +402,7 @@ def test_correlation(self, *mock): # data with multiple numerical columns, with nan values in only one # column - data = pd.DataFrame( + data = pl.DataFrame( { "a": [np.nan, np.nan, 1, 7, 5, 9, 4, 10, 7, 2], "b": [10, 11, 1, 4, 2, 5, 6, 3, 9, 8], @@ -419,7 +422,7 @@ def test_correlation(self, *mock): ) # data with only one numerical columns without nan values - data = pd.DataFrame({"a": [3, 2, 1, 7, 5, 9, 4, 10, 7, 2]}) + data = pl.DataFrame({"a": [3, 2, 1, 7, 5, 9, 4, 10, 7, 2]}) profiler = dp.StructuredProfiler(data, options=profile_options) expected_corr_mat = np.array([[1]]) np.testing.assert_array_almost_equal( @@ -427,7 +430,7 @@ def test_correlation(self, *mock): ) # data with no numeric columns - data = pd.DataFrame( + data = pl.DataFrame( {"a": ["hi", "hi2", "hi3"], "b": ["test1", "test2", "test3"]} ) profiler = dp.StructuredProfiler(data, options=profile_options) @@ -438,7 +441,7 @@ def test_correlation(self, *mock): # data with only one numeric column # data with no numeric columns - data = pd.DataFrame( + data = pl.DataFrame( { "a": ["hi", "hi2", "hi3"], "b": ["test1", "test2", "test3"], @@ -454,7 +457,7 @@ def test_correlation(self, *mock): ) # Data with null rows - data = pd.DataFrame( + data = pl.DataFrame( { "a": [None, 2, 1, np.nan, 5, np.nan, 4, 10, 7, np.nan], "b": [np.nan, 11, 1, "nan", 2, np.nan, 6, 3, 9, np.nan], @@ -478,7 +481,7 @@ def test_correlation(self, *mock): ) # Data with null rows and some imputed values - data = pd.DataFrame( + data = pl.DataFrame( { "a": [None, np.nan, 1, 7, 5, 9, 4, 10, np.nan, 2], "b": [10, 11, 1, 4, 2, 5, np.nan, 3, np.nan, 8], @@ -512,7 +515,7 @@ def test_merge_correlation(self, *mocks): profile_options.set({"correlation.is_enabled": True}) # merge between two existing correlations - data = pd.DataFrame( + data = pl.DataFrame( { "a": [3, 2, 1, 7, 5, 9, 4, 10, 7, 2], "b": [10, 11, 1, 4, 2, 5, 6, 3, 9, 8], @@ -563,7 +566,7 @@ def test_merge_correlation(self, *mocks): self.assertDictEqual({"row_stats": 1, "correlation": 1}, merged_profile.times) # Merge between existing data and empty data that still has samples - data = pd.DataFrame( + data = pl.DataFrame( { "a": [1, 2, 4, np.nan, None, np.nan], "b": [5, 7, 1, np.nan, np.nan, "nan"], @@ -593,7 +596,7 @@ def test_correlation_update(self): ) # Test with all numeric columns - data = pd.DataFrame( + data = pl.DataFrame( { "a": [3, 2, 1, 7, 5, 9, 4, 10, 7, 2], "b": [10, 11, 1, 4, 2, 5, 6, 3, 9, 8], @@ -620,7 +623,7 @@ def test_correlation_update(self): self.assertDictEqual({"row_stats": 2, "correlation": 2}, profiler.times) # Test when there's a non-numeric column - data = pd.DataFrame( + data = pl.DataFrame( { "a": [3, 2, 1, 7, 5, 9, 4, 10, 7, 2], "b": [10, 11, 1, 4, 2, 5, 6, 3, 9, 8], @@ -646,7 +649,7 @@ def test_correlation_update(self): # Data with multiple numerical and non-numeric columns, with nan values in only one column # NaNs imputed to (9+4+10)/3 - data = pd.DataFrame( + data = pl.DataFrame( { "a": [7, 2, 1, 7, 5, 9, 4, 10, np.nan, np.nan], "b": [10, 11, 1, 4, 2, 5, 6, 3, 9, 8], @@ -672,7 +675,7 @@ def test_correlation_update(self): ) # Data with null rows, all null rows are dropped - data = pd.DataFrame( + data = pl.DataFrame( { "a": [np.nan, 2, 1, None, 5, np.nan, 4, 10, 7, "NaN"], "b": [np.nan, 11, 1, np.nan, 2, np.nan, 6, 3, 9, np.nan], @@ -698,7 +701,7 @@ def test_correlation_update(self): ) # Data with null rows and some imputed values - data = pd.DataFrame( + data = pl.DataFrame( { "a": [None, np.nan, 1, 7, 5, 9, 4, 10, "nan", 2], "b": [10, 11, 1, 4, 2, 5, "NaN", 3, None, 8], @@ -732,7 +735,7 @@ def test_correlation_update(self): "dataprofiler.profilers.profile_builder.DataLabeler", spec=StructuredDataLabeler ) def test_correlation_selected_columns(self, *mocks): - data = pd.DataFrame( + data = pl.DataFrame( { "a": [3, 2, 1, 7, 5, 9, 4, 10, 7, 2], "b": [10, 11, 1, 4, 2, 5, 6, 3, 9, 8], @@ -761,7 +764,7 @@ def test_correlation_selected_columns(self, *mocks): expected_corr_mat, profiler.correlation_matrix ) - data = data.rename(columns={"d": "b"}) + data = data.rename({"d": "b"}) profile_options = dp.ProfilerOptions() profile_options.set( { @@ -785,19 +788,19 @@ def test_correlation_selected_columns(self, *mocks): def test_chi2(self, *mocks): # Empty - data = pd.DataFrame([]) + data = pl.DataFrame([]) profile_options = dp.ProfilerOptions() profile_options.set({"structured_options.multiprocess.is_enabled": False}) profiler = dp.StructuredProfiler(data, options=profile_options) self.assertIsNone(profiler.chi2_matrix) # Single column - data = pd.DataFrame({"a": ["y", "y", "n", "n", "y"]}) + data = pl.DataFrame({"a": ["y", "y", "n", "n", "y"]}) profiler = dp.StructuredProfiler(data, options=profile_options) expected_mat = np.array([1]) self.assertEqual(expected_mat, profiler.chi2_matrix) - data = pd.DataFrame( + data = pl.DataFrame( { "a": ["y", "y", "y", "y", "n", "n", "n"], "b": ["y", "maybe", "y", "y", "n", "n", "maybe"], @@ -812,7 +815,7 @@ def test_chi2(self, *mocks): np.testing.assert_array_almost_equal(expected_mat, profiler.chi2_matrix) # All different categories - data = pd.DataFrame( + data = pl.DataFrame( { "a": ["y", "y", "y", "y", "n", "n", "n"], "b": ["a", "maybe", "a", "a", "b", "b", "maybe"], @@ -827,7 +830,7 @@ def test_chi2(self, *mocks): np.testing.assert_array_almost_equal(expected_mat, profiler.chi2_matrix) # Identical columns - data = pd.DataFrame( + data = pl.DataFrame( { "a": ["y", "y", "y", "y", "n", "n", "n"], "b": ["y", "y", "y", "y", "n", "n", "n"], @@ -845,7 +848,7 @@ def test_chi2(self, *mocks): ) def test_merge_chi2(self, *mocks): # Merge empty data - data = pd.DataFrame( + data = pl.DataFrame( { "a": ["y", "y", "y", "y", "n", "n", "n"], "b": ["y", "maybe", "y", "y", "n", "n", "maybe"], @@ -866,7 +869,7 @@ def test_merge_chi2(self, *mocks): ) np.testing.assert_array_almost_equal(expected_mat, profiler3.chi2_matrix) - data = pd.DataFrame( + data = pl.DataFrame( { "a": ["y", "y", "y", "y", "n", "n", "n"], "b": ["y", "maybe", "y", "y", "n", "n", "maybe"], @@ -885,7 +888,7 @@ def test_merge_chi2(self, *mocks): np.testing.assert_array_almost_equal(expected_mat, profiler3.chi2_matrix) # All different categories - data = pd.DataFrame( + data = pl.DataFrame( { "a": ["y", "y", "y", "y", "n", "n", "n"], "b": ["a", "maybe", "a", "a", "b", "b", "maybe"], @@ -903,7 +906,7 @@ def test_merge_chi2(self, *mocks): np.testing.assert_array_almost_equal(expected_mat, profiler3.chi2_matrix) # Identical columns - data = pd.DataFrame( + data = pl.DataFrame( { "a": ["y", "y", "y", "y", "n", "n", "n"], "b": ["y", "y", "y", "y", "n", "n", "n"], @@ -924,14 +927,14 @@ def test_merge_chi2(self, *mocks): ) def test_update_chi2(self, *mocks): # Update with empty data - data1 = pd.DataFrame( + data1 = pl.DataFrame( { "a": ["y", "y", "y", "y", "n", "n", "n"], "b": ["y", "maybe", "y", "y", "n", "n", "maybe"], "c": ["n", "maybe", "n", "n", "n", "y", "y"], } ) - data2 = pd.DataFrame({"a": [], "b": [], "c": []}) + data2 = pl.DataFrame({"a": [], "b": [], "c": []}) profile_options = dp.ProfilerOptions() profile_options.set({"structured_options.multiprocess.is_enabled": False}) profiler = dp.StructuredProfiler(data1, options=profile_options) @@ -941,7 +944,7 @@ def test_update_chi2(self, *mocks): ) np.testing.assert_array_almost_equal(expected_mat, profiler.chi2_matrix) - data = pd.DataFrame( + data = pl.DataFrame( { "a": ["y", "y", "y", "y", "n", "n", "n"], "b": ["y", "maybe", "y", "y", "n", "n", "maybe"], @@ -958,7 +961,7 @@ def test_update_chi2(self, *mocks): np.testing.assert_array_almost_equal(expected_mat, profiler.chi2_matrix) # All different categories - data = pd.DataFrame( + data = pl.DataFrame( { "a": ["y", "y", "y", "y", "n", "n", "n"], "b": ["a", "maybe", "a", "a", "b", "b", "maybe"], @@ -976,7 +979,7 @@ def test_update_chi2(self, *mocks): np.testing.assert_array_almost_equal(expected_mat, profiler.chi2_matrix) # Identical columns - data = pd.DataFrame( + data = pl.DataFrame( { "a": ["y", "y", "y", "y", "n", "n", "n"], "b": ["y", "y", "y", "y", "n", "n", "n"], @@ -1067,9 +1070,9 @@ def test_report(self): self.assertEqual(pr_mock.call_count, 17) def test_report_schema_and_data_stats_match_order(self): - data = pd.DataFrame( + data = pl.DataFrame( [[1, 2, 3, 4, 5, 6], [10, 20, 30, 40, 50, 60]], - columns=["a", "b", "a", "b", "c", "d"], + schema=["a", "b", "a", "b", "c", "d"], ) profiler_options = ProfilerOptions() profiler_options.set({"data_labeler.is_enabled": False}) @@ -1143,9 +1146,9 @@ def test_report_casts_non_seriazable_schemas(self, *mocks): ) def test_omit_keys_with_duplicate_cols(self): - data = pd.DataFrame( + data = pl.DataFrame( [[1, 2, 3, 4, 5, 6], [10, 20, 30, 40, 50, 60]], - columns=["a", "b", "a", "b", "c", "d"], + schema=["a", "b", "a", "b", "c", "d"], ) profiler_options = ProfilerOptions() profiler_options.set({"data_labeler.is_enabled": False}) @@ -1182,9 +1185,9 @@ def test_omit_keys_with_duplicate_cols(self): ) def test_omit_cols_preserves_schema(self): - data = pd.DataFrame( + data = pl.DataFrame( [[1, 2, 3, 4, 5, 6], [10, 20, 30, 40, 50, 60]], - columns=["a", "b", "a", "b", "c", "d"], + schema=["a", "b", "a", "b", "c", "d"], ) omit_cols = ["a", "d"] omit_idxs = [0, 2, 5] @@ -1207,13 +1210,13 @@ def test_omit_cols_preserves_schema(self): self.assertIsNone(col_report) def test_report_remove_disabled_flag(self): - data = pd.DataFrame( + data = pl.DataFrame( [ [1.01, 2.02, "if you"], [10.01, 20.02, "read this you"], [100.01, 200.02, "are cool"], ], - columns=["a", "b", "wordy_text_words"], + schema=["a", "b", "wordy_text_words"], ) # with options to disable FloatColumn `precision` @@ -1452,7 +1455,7 @@ def test_sample_size_warning_in_the_profiler(self, *mocks): sdp_mock.clean_data_and_get_base_stats.return_value = (None, None) mocks[0].return_value = sdp_mock - data = pd.DataFrame([1, None, 3, 4, 5, None]) + data = pl.DataFrame([1, None, 3, 4, 5, None]) with self.assertWarnsRegex( UserWarning, "The data will be profiled with a sample " @@ -1473,17 +1476,17 @@ def test_sample_size_warning_in_the_profiler(self, *mocks): ) def test_min_col_samples_used(self, *mocks): # No cols sampled since no cols to sample - empty_df = pd.DataFrame([]) + empty_df = pl.DataFrame([]) empty_profile = dp.StructuredProfiler(empty_df) self.assertEqual(0, empty_profile._min_col_samples_used) # Every column fully sampled - full_df = pd.DataFrame([[1, 2, 3], [4, 5, 6], [7, 8, 9]]) + full_df = pl.DataFrame([[1, 2, 3], [4, 5, 6], [7, 8, 9]]) full_profile = dp.StructuredProfiler(full_df) self.assertEqual(3, full_profile._min_col_samples_used) # First col sampled only twice, so that is min - sparse_df = pd.DataFrame([[1, None, None], [1, 1, None], [1, None, 1]]) + sparse_df = pl.DataFrame([[1, None, None], [1, 1, None], [1, None, 1]]) sparse_profile = dp.StructuredProfiler( sparse_df, min_true_samples=2, samples_per_update=1 ) @@ -1495,7 +1498,7 @@ def test_min_col_samples_used(self, *mocks): ) @mock.patch("dataprofiler.profilers.profile_builder.DataLabeler") def test_min_true_samples(self, *mocks): - empty_df = pd.DataFrame([]) + empty_df = pl.DataFrame([]) # Test invalid input msg = "`min_true_samples` must be an integer or `None`." @@ -1612,7 +1615,7 @@ def test_save_and_load_json_file(self): def test_save_and_load_no_labeler(self): # Create Data and UnstructuredProfiler objects - data = pd.DataFrame([1, 2, 3], columns=["a"]) + data = pl.DataFrame([1, 2, 3], schema=["a"]) profile_options = dp.ProfilerOptions() profile_options.set({"data_labeler.is_enabled": False}) @@ -1634,8 +1637,8 @@ def test_save_and_load_no_labeler(self): self.assertDictEqual(save_report, load_report) # validate both are still usable after - save_profile.update_profile(pd.DataFrame({"a": [4, 5]})) - load_profile.update_profile(pd.DataFrame({"a": [4, 5]})) + save_profile.update_profile(pl.DataFrame({"a": [4, 5]})) + load_profile.update_profile(pl.DataFrame({"a": [4, 5]})) @mock.patch( "dataprofiler.profilers.data_labeler_column_profile.DataLabelerColumn.update" @@ -1649,7 +1652,7 @@ def test_save_json_file(self, *mocks): mock_labeler._default_model_loc = "structured_model" mocks[0].load_from_library.return_value = mock_labeler - df_structured = pd.DataFrame( + df_structured = pl.DataFrame( [ [-1.5, 3.0, "nan"], ["a", "z"], @@ -1688,7 +1691,7 @@ def test_save_json_file(self, *mocks): ], "options": mock.ANY, "encoding": None, - "file_type": "", + "file_type": "", "_samples_per_update": None, "_min_true_samples": 0, "total_samples": 3, @@ -1738,7 +1741,7 @@ def test_save_value_error(self, *mocks): mock_labeler._default_model_loc = "structured_model" mocks[0].load_from_library.return_value = mock_labeler - df_structured = pd.DataFrame( + df_structured = pl.DataFrame( [ [-1.5, 3.0, "nan"], ["a", "z"], @@ -1772,7 +1775,7 @@ def test_save_value_error(self, *mocks): "StructuredProfiler._update_correlation" ) def test_string_index_doesnt_cause_error(self, *mocks): - dp.StructuredProfiler(pd.DataFrame([[1, 2, 3]], index=["hello"])) + dp.StructuredProfiler(pl.DataFrame([[1, 2, 3]], index=["hello"])) @mock.patch( "dataprofiler.profilers.profile_builder." "ColumnPrimitiveTypeProfileCompiler" @@ -1787,14 +1790,14 @@ def test_string_index_doesnt_cause_error(self, *mocks): def test_dict_in_data_no_error(self, *mocks): # validates that _update_row_statistics does not error when trying to # hash a dict. - profiler = dp.StructuredProfiler(pd.DataFrame([[{"test": 1}], [None]])) + profiler = dp.StructuredProfiler(pl.DataFrame([[{"test": 1}], [None]])) self.assertEqual(1, profiler.row_is_null_count) self.assertEqual(2, profiler.total_samples) def test_duplicate_columns(self): - data = pd.DataFrame( + data = pl.DataFrame( [[1, 2, 3, 4, 5, 6], [10, 20, 30, 40, 50, 60]], - columns=["a", "b", "a", "b", "c", "d"], + schema=["a", "b", "a", "b", "c", "d"], ) profile_options = dp.ProfilerOptions() profile_options.set({"structured_options.multiprocess.is_enabled": False}) @@ -1827,8 +1830,8 @@ def test_duplicate_columns(self): ) # Check that update works as expected - new_data = pd.DataFrame( - [[100, 200, 300, 400, 500, 600]], columns=["a", "b", "a", "b", "c", "d"] + new_data = pl.DataFrame( + [[100, 200, 300, 400, 500, 600]], schema=["a", "b", "a", "b", "c", "d"] ) profiler.update_profile(new_data) self.assertDictEqual(expected_mapping, profiler._col_name_to_idx) @@ -1853,9 +1856,9 @@ def test_duplicate_columns(self): ) def test_unique_col_permutation(self, *mocks): - data = pd.DataFrame([[1, 2, 3, 4], [5, 6, 7, 8]], columns=["a", "b", "c", "d"]) - perm_data = pd.DataFrame( - [[4, 3, 2, 1], [8, 7, 6, 5]], columns=["d", "c", "b", "a"] + data = pl.DataFrame([[1, 2, 3, 4], [5, 6, 7, 8]], schema=["a", "b", "c", "d"]) + perm_data = pl.DataFrame( + [[4, 3, 2, 1], [8, 7, 6, 5]], schema=["d", "c", "b", "a"] ) profile_options = dp.ProfilerOptions() profile_options.set({"structured_options.multiprocess.is_enabled": False}) @@ -1997,10 +2000,8 @@ def test_diff(self, *mocks): "statistics": {"numerical_statistics_here": "unchanged"}, } - data1 = pd.DataFrame([[1, 2], [5, 6]], columns=["a", "b"]) - data2 = pd.DataFrame( - [[4, 3], [8, 7], [None, None], [9, 10]], columns=["a", "b"] - ) + data1 = pl.DataFrame([[1, 2], [5, 6]], schema=["a", "b"]) + data2 = pl.DataFrame([[4, 3], [8, 7], [None, None], [9, 10]], schema=["a", "b"]) options = dp.ProfilerOptions() options.structured_options.correlation.is_enabled = True @@ -2090,7 +2091,7 @@ def test_diff(self, *mocks): "dataprofiler.profilers.data_labeler_column_profile.DataLabelerColumn.update" ) def test_diff_type_checking(self, *mocks): - data = pd.DataFrame([[1, 2], [5, 6]], columns=["a", "b"]) + data = pl.DataFrame([[1, 2], [5, 6]], schema=["a", "b"]) profile = dp.StructuredProfiler(data) with self.assertRaisesRegex( TypeError, @@ -2104,9 +2105,9 @@ def test_diff_type_checking(self, *mocks): ) def test_diff_with_different_schema(self, *mocks): - data1 = pd.DataFrame([[1, 2], [5, 6]], columns=["G", "b"]) - data2 = pd.DataFrame( - [[4, 3, 1], [8, 7, 3], [None, None, 1], [9, 1, 10]], columns=["a", "b", "c"] + data1 = pl.DataFrame([[1, 2], [5, 6]], schema=["G", "b"]) + data2 = pl.DataFrame( + [[4, 3, 1], [8, 7, 3], [None, None, 1], [9, 1, 10]], schema=["a", "b", "c"] ) # Test via add @@ -2144,8 +2145,8 @@ def test_diff_with_different_schema(self, *mocks): def test_diff_categorical_chi2_test(self, *mocks): """Ensuring that chi2-test is bubbled up to the top when running the top level profiler.""" - df_categorical = pd.Series(["y", "y", "y", "y", "n", "n", "n"]) - df_categorical2 = pd.Series(["y", "maybe", "y", "y", "n", "n", "maybe"]) + df_categorical = pl.Series(["y", "y", "y", "y", "n", "n", "n"]) + df_categorical2 = pl.Series(["y", "maybe", "y", "y", "n", "n", "maybe"]) profile1 = dp.StructuredProfiler(df_categorical) profile2 = dp.StructuredProfiler(df_categorical2) @@ -2185,7 +2186,7 @@ def test_logs(self, mock_stderr, *mocks): with self.assertLogs( "DataProfiler.profilers.profile_builder", level="INFO" ) as logs: - StructuredProfiler(pd.DataFrame([[0, 1], [2, 3]]), options=options) + StructuredProfiler(pl.DataFrame([[0, 1], [2, 3]]), options=options) # Logs to update user on nulls and statistics self.assertEqual( @@ -2208,13 +2209,13 @@ def test_logs(self, mock_stderr, *mocks): # Now tqdm shouldn't be printed dp.set_verbosity(logging.WARNING) - StructuredProfiler(pd.DataFrame([[0, 1], [2, 3]])) + StructuredProfiler(pl.DataFrame([[0, 1], [2, 3]])) # Ensure no progress bar printed self.assertNotIn("#" * 10, mock_stderr.getvalue()) def test_null_replication_metrics_calculation(self): - data = pd.DataFrame( + data = pl.DataFrame( { "a": [3, 2, np.nan, 7, None], "b": [10, 10, 1, 4, 2], @@ -2247,7 +2248,7 @@ def test_null_replication_metrics_calculation(self): np.testing.assert_array_almost_equal([3 / 2, 10 / 2], column["class_mean"][1]) # Test Profile merges - data_2 = pd.DataFrame( + data_2 = pl.DataFrame( { "a": [3, 2, "null", 7, 5], "b": [10, 10, 1, 4, 2], @@ -2301,7 +2302,7 @@ def test_null_replication_metrics_calculation(self): np.testing.assert_array_almost_equal([12 / 2, 6 / 2], column["class_mean"][1]) # Test with all null in a column - data_3 = pd.DataFrame([[9999999, 9], [9999999, 9]]) + data_3 = pl.DataFrame([[9999999, 9], [9999999, 9]]) NO_FLAG = 0 profile_options = dp.ProfilerOptions() @@ -2334,7 +2335,7 @@ def test_null_replication_metrics_calculation(self): np.testing.assert_array_almost_equal([[np.nan], [9]], column["class_mean"]) # Test with all null in a row - data_4 = pd.DataFrame( + data_4 = pl.DataFrame( [[10, 20], [9999999, 9999999], [30, 9999999], [9999999, 9999999]] ) @@ -2349,7 +2350,7 @@ def test_null_replication_metrics_calculation(self): np.testing.assert_array_almost_equal([[10], [0]], column["class_mean"]) # account for datetime - data = pd.DataFrame( + data = pl.DataFrame( { "a": [3, 2, np.nan, 7, None], "b": [10, 10, 1, 4, 2], @@ -2373,7 +2374,7 @@ def test_null_replication_metrics_calculation(self): np.testing.assert_equal(expected_null_rep, profiler._null_replication_metrics) def test_column_level_invalid_values(self): - data = pd.DataFrame([[1, 1], [9999999, 2], [3, 3]]) + data = pl.DataFrame([[1, 1], [9999999, 2], [3, 3]]) NO_FLAG = 0 profile_options = dp.ProfilerOptions() @@ -2459,7 +2460,7 @@ def test_json_encode(self, mock_DataLabeler, *mocks): def test_json_encode_after_update(self, mock_DataLabeler, *mocks): mock_DataLabeler._default_model_loc = "test" mock_DataLabeler.return_value = mock_DataLabeler - df_structured = pd.DataFrame( + df_structured = pl.DataFrame( [ [-1.5, 3.0, "nan"], ["a", "z"], @@ -2476,7 +2477,7 @@ def test_json_encode_after_update(self, mock_DataLabeler, *mocks): "_profile": [mock.ANY, mock.ANY], "options": mock.ANY, "encoding": None, - "file_type": "", + "file_type": "", "_samples_per_update": None, "_min_true_samples": 0, "total_samples": 3, @@ -2562,7 +2563,7 @@ def test_json_decode_after_update( mock_labeler.reverse_label_mapping = {1: "a", 2: "b"} fake_profile_name = None - df_structured = pd.DataFrame([["1.5", "a", "4"], ["3.0", "z", 7]]) + df_structured = pl.DataFrame([["1.5", "a", "4"], ["3.0", "z", 7]]) # update mock for 2 confidence values for 2 possible classes mock_labeler.predict.side_effect = lambda *args, **kwargs: { @@ -2602,7 +2603,7 @@ def test_json_decode_after_update( self.assertDictEqual(expected_config, config) # validating update after deserialization - df_structured = pd.DataFrame( + df_structured = pl.DataFrame( [ [4.0, "nan", "15.0"], # partial nan row ["nan", "nan", "nan"], # Full nan row @@ -2627,10 +2628,10 @@ def setUpClass(cls): cls.input_file_path = os.path.join( test_root_path, "data", "csv/aws_honeypot_marx_geo.csv" ) - cls.aws_dataset = pd.read_csv(cls.input_file_path) + cls.aws_dataset = pl.read_csv(cls.input_file_path, infer_schema_length=0) def test_base_props(self): - src_column = self.aws_dataset.src + src_column = self.aws_dataset["src"].cast(pl.Int64) src_profile = StructuredColProfiler(src_column, sample_size=len(src_column)) self.assertIsInstance( @@ -2676,7 +2677,7 @@ def test_base_props(self): ) @mock.patch("dataprofiler.profilers.data_labeler_column_profile.DataLabeler") def test_add_profilers(self, *mocks): - data = pd.Series([1, None, 3, 4, 5, None]) + data = pl.Series([1, None, 3, 4, 5, None]) profile1 = StructuredColProfiler(data[:2]) profile2 = StructuredColProfiler(data[2:]) @@ -2714,11 +2715,10 @@ def test_add_profilers(self, *mocks): profile2.profiles = dict(test=2) merged_profile = profile1 + profile2 self.assertEqual(3, merged_profile.profiles["test"]) - self.assertCountEqual(["5.0", "4.0", "3.0", "1.0"], merged_profile.sample) + self.assertCountEqual(["5", "4", "3", "1"], merged_profile.sample) self.assertEqual(6, merged_profile.sample_size) self.assertEqual(2, merged_profile.null_count) - self.assertListEqual(["nan"], merged_profile.null_types) - self.assertDictEqual({"nan": {1, 5}}, merged_profile.null_types_index) + self.assertListEqual(["None"], merged_profile.null_types) # test add with different sampling properties profile1._min_sample_size = 10 @@ -2736,7 +2736,7 @@ def test_integrated_merge_diff_options(self): options = dp.ProfilerOptions() options.set({"data_labeler.is_enabled": False}) - data = pd.DataFrame([1, 2, 3, 4]) + data = pl.DataFrame([1, 2, 3, 4]) profile1 = dp.StructuredProfiler(data, options=options) profile2 = dp.StructuredProfiler(data) with self.assertRaisesRegex( @@ -2749,7 +2749,7 @@ def test_integrated_merge_diff_options(self): profile1 + profile2 def test_clean_data_and_get_base_stats(self, *mocks): - data = pd.Series([1, None, 3, 4, None, 6], index=["a", "b", "c", "d", "e", "f"]) + data = pl.Series([1, None, 3.0, 4, None, 6]) # validate that if sliced data, still functional # previously `iloc` was used at: @@ -2776,16 +2776,16 @@ def test_clean_data_and_get_base_stats(self, *mocks): min_true_samples=0, ) # note data above is a subset `df_series=data[1:]`, 1.0 will not exist - self.assertTrue(np.issubdtype(np.object_, df_series.dtype)) + self.assertTrue(pl.String == df_series.dtype) self.assertDictEqual( { "sample": ["6.0", "3.0", "4.0"], "sample_size": 5, "null_count": 2, "null_ratio": 2 / 5, - "null_types": dict(nan=["e", "b"]), - "min_id": None, - "max_id": None, + "null_types": {"None"}, + "min_id": "depreciated", + "max_id": "depreciated", }, base_stats, ) @@ -2797,13 +2797,13 @@ def test_clean_data_and_get_base_stats(self, *mocks): ) self.assertDictEqual( { - "sample": ["6.0", "nan", "nan", "4.0"], + "sample": ["6.0", "None", "None", "4.0"], "sample_size": 6, "null_count": 2, "null_ratio": 2 / 6, - "null_types": {"1.0": ["a"], "3.0": ["c"]}, - "min_id": None, - "max_id": None, + "null_types": {"1.0", "3.0"}, + "min_id": "depreciated", + "max_id": "depreciated", }, base_stats, ) @@ -2815,26 +2815,26 @@ def test_clean_data_and_get_base_stats(self, *mocks): ) self.assertDictEqual( { - "sample": ["3.0", "4.0", "nan", "6.0", "nan"], + "sample": ["3.0", "4.0", "None", "6.0", "None"], "sample_size": 6, "null_count": 0, "null_ratio": 0 / 6, - "null_types": {}, - "min_id": None, - "max_id": None, + "null_types": set(), + "min_id": "depreciated", + "max_id": "depreciated", }, base_stats, ) def test_column_names(self): data = [["a", 1], ["b", 2], ["c", 3]] - df = pd.DataFrame(data, columns=["letter", "number"]) + df = pl.DataFrame(data, schema=["letter", "number"]) profile1 = StructuredColProfiler(df["letter"]) profile2 = StructuredColProfiler(df["number"]) self.assertEqual(profile1.name, "letter") self.assertEqual(profile2.name, "number") - df_series = pd.Series([1, 2, 3, 4, 5]) + df_series = pl.Series([1, 2, 3, 4, 5]) profile = StructuredColProfiler(df_series) self.assertEqual(profile.name, df_series.name) @@ -2854,7 +2854,7 @@ def test_update_match_are_abstract(self): ) def test_data_labeler_toggle(self): - src_column = self.aws_dataset.src + src_column = self.aws_dataset["src"].cast(pl.Int64) structured_options = StructuredOptions() structured_options.data_labeler.is_enabled = False std_profile = StructuredColProfiler(src_column, sample_size=len(src_column)) @@ -2865,7 +2865,7 @@ def test_data_labeler_toggle(self): self.assertNotIn("data_label_profile", togg_profile.profiles) def test_null_count(self): - column = pd.Series([1, float("nan")] * 10) + column = pl.Series([1, float("nan")] * 10) # test null_count when full sample size random.seed(0) @@ -2874,15 +2874,15 @@ def test_null_count(self): def test_generating_report_ensure_no_error(self): file_path = os.path.join(test_root_path, "data", "csv/diamonds.csv") - data = pd.read_csv(file_path) + data = pl.read_csv(file_path) profile = dp.StructuredProfiler(data[:1000]) readable_report = profile.report(report_options={"output_format": "compact"}) def test_get_sample_size(self): - data = pd.DataFrame([0] * int(50e3)) + data = pl.DataFrame([0] * int(50e3)) # test data size < min_sample_size = 5000 by default - profiler = dp.StructuredProfiler(pd.DataFrame([])) + profiler = dp.StructuredProfiler(pl.DataFrame([])) profiler._min_sample_size = 5000 profiler._sampling_ratio = 0.2 sample_size = profiler._get_sample_size(data[:1000]) @@ -2909,7 +2909,7 @@ def test_sample_size_passed_to_profile(self, *mocks): update_mock = mocks[0] # data setup - data = pd.DataFrame([0] * int(50e3)) + data = pl.DataFrame([0] * int(50e3)) # option setup profiler_options = ProfilerOptions() @@ -2936,7 +2936,7 @@ def test_sample_size_passed_to_profile(self, *mocks): def test_sampling_ratio_passed_to_profile(self): # data setup - data = pd.DataFrame([0] * int(50e3)) + data = pl.DataFrame([0] * int(50e3)) # option setup profiler_options = ProfilerOptions() @@ -2968,71 +2968,7 @@ def test_sampling_ratio_passed_to_profile(self): profiler = dp.StructuredProfiler(data[:10000], options=profiler_options) self.assertEqual(10000, profiler.report()["global_stats"]["samples_used"]) - @mock.patch( - "dataprofiler.profilers.column_profile_compilers.BaseCompiler.update_profile" - ) - @mock.patch("dataprofiler.profilers.data_labeler_column_profile.DataLabeler") - def test_index_overlap_for_update_profile(self, *mocks): - data = pd.Series([0, None, 1, 2, None]) - profile = StructuredColProfiler(data) - self.assertEqual(0, profile._min_id) - self.assertEqual(4, profile._max_id) - self.assertDictEqual(profile.null_types_index, {"nan": {1, 4}}) - profile.update_profile(data) - # Now all indices will be shifted by max_id + 1 (5) - # So the 2 None will move from indices 1, 4 to 6, 9 - self.assertEqual(0, profile._min_id) - self.assertEqual(9, profile._max_id) - self.assertDictEqual(profile.null_types_index, {"nan": {1, 4, 6, 9}}) - - @mock.patch( - "dataprofiler.profilers.column_profile_compilers.BaseCompiler.update_profile" - ) - @mock.patch("dataprofiler.profilers.data_labeler_column_profile.DataLabeler") - def test_index_overlap_for_merge(self, *mocks): - data = pd.Series([0, None, 1, 2, None]) - profile1 = StructuredColProfiler(data) - profile2 = StructuredColProfiler(data) - - # Ensure merged profile included shifted indices - profile3 = profile1 + profile2 - self.assertEqual(0, profile3._min_id) - self.assertEqual(9, profile3._max_id) - self.assertDictEqual(profile3.null_types_index, {"nan": {1, 4, 6, 9}}) - - # Ensure original profiles not overwritten - self.assertEqual(0, profile1._min_id) - self.assertEqual(4, profile1._max_id) - self.assertDictEqual(profile1.null_types_index, {"nan": {1, 4}}) - self.assertEqual(0, profile2._min_id) - self.assertEqual(4, profile2._max_id) - self.assertDictEqual(profile2.null_types_index, {"nan": {1, 4}}) - - @mock.patch( - "dataprofiler.profilers.column_profile_compilers.BaseCompiler.update_profile" - ) - @mock.patch("dataprofiler.profilers.data_labeler_column_profile.DataLabeler") - def test_min_max_id_properly_update(self, *mocks): - data = pd.Series([1, None, 3, 4, 5, None, 1]) - profile1 = StructuredColProfiler(data[:2]) - profile2 = StructuredColProfiler(data[2:]) - - # Base initialization - self.assertEqual(0, profile1._min_id) - self.assertEqual(1, profile1._max_id) - self.assertEqual(2, profile2._min_id) - self.assertEqual(6, profile2._max_id) - - # Needs to work with merge - profile3 = profile1 + profile2 - self.assertEqual(0, profile3._min_id) - self.assertEqual(6, profile3._max_id) - - # Needs to work with update_profile - profile = StructuredColProfiler(data[:2]) - profile.update_profile(data[2:]) - self.assertEqual(0, profile._min_id) - self.assertEqual(6, profile._max_id) + # Removed because of polars does not support indexing @mock.patch( "dataprofiler.profilers.data_labeler_column_profile.DataLabelerColumn.update" @@ -3075,10 +3011,10 @@ def test_diff(self, *mocks): "statistics": {"numerical_statistics_here": "unchanged"}, } - data = pd.Series([1, None, 3, 4, 5, None, 1]) - data2 = pd.Series(["hello", "goodby", 125, 0]) - data.name = "TEST" - data2.name = "TEST" + data = pl.Series([1, None, 3, 4, 5, None, 1]) + data2 = pl.Series(["hello", "goodby", 125, 0]) + data.rename("TEST") + data2.rename("TEST") profile1 = StructuredColProfiler(data) profile2 = StructuredColProfiler(data2) @@ -3158,7 +3094,7 @@ def test_json_encode_after_update(self, mock_DataLabeler, *mocks): mock_labeler.reverse_label_mapping = {1: "a", 2: "b"} mock_DataLabeler.load_from_library.return_value = mock_labeler - data = pd.Series(["-2", "Nan", "1", "2"], name="test") + data = pl.Series("test", ["-2", "Nan", "1", "2"]) # update mock for 4 values mock_labeler.predict.return_value = {"pred": [], "conf": np.zeros((4, 2))} with test_utils.mock_timeit(): @@ -3256,7 +3192,7 @@ def test_json_decode_after_update( mock_utils_DataLabeler.load_from_library.return_value = mock_labeler # Build expected StructuredColProfiler - df_float = pd.Series([-1.5, None, 5.0, 7.0, 4.0, 3.0, "NaN", 0, 0, 9.0]).apply( + df_float = pl.Series([-1.5, None, 5.0, 7.0, 4.0, 3.0, None, 0, 0, 9.0]).cast( str ) # update mock for 10 values @@ -3278,7 +3214,7 @@ def test_json_decode_after_update( }, } - df_float = pd.Series( + df_float = pl.Series( [ "NaN", # add existing "15.0", # add new @@ -3342,19 +3278,19 @@ def test_base(self, *mocks): self.assertEqual(profiler._min_true_samples, 5) # can properties update correctly for data - data = pd.Series(["this", "is my", "\n\r", "test"]) + data = pl.Series(["this", "is my", "\n\r", "test"]) profiler = UnstructuredProfiler(data) self.assertEqual(4, profiler.total_samples) self.assertCountEqual(["this", "is my", "test"], profiler.sample) self.assertEqual(1, profiler._empty_line_count) self.assertEqual(15 / 1024**2, profiler.memory_size) - self.assertEqual("", profiler.file_type) + self.assertEqual("", profiler.file_type) self.assertIsNone(profiler.encoding) self.assertIsInstance(profiler._profile, UnstructuredCompiler) self.assertIn("clean_and_base_stats", profiler.times) # can properties update correctly for data loaded from file - data = pd.Series(["this", "is my", "\n\r", "test"]) + data = pl.Series(["this", "is my", "\n\r", "test"]) mock_data_reader = mock.Mock(spec=dp.data_readers.csv_data.CSVData) mock_data_reader.data = data mock_data_reader.data_type = "csv" @@ -3374,8 +3310,8 @@ def test_bad_input_data(self, *mocks): allowed_data_types = ( r"\(, " r", " - r", " - r"\)" + r", " + r"\)" ) bad_data_types = [1, {}, np.inf] for data in bad_data_types: @@ -3408,19 +3344,21 @@ def test_list_input_data(self, *mocks): self.assertIsInstance(profiler._profile, UnstructuredCompiler) def test_dataframe_input_data(self, *mocks): - data = pd.DataFrame(["this", "is my", "\n\r", "test"]) + data = pl.DataFrame(["this", "is my", "\n\r", "test"]) profiler = UnstructuredProfiler(data) self.assertEqual(4, profiler.total_samples) self.assertEqual(1, profiler._empty_line_count) self.assertEqual(15 / 1024**2, profiler.memory_size) - self.assertEqual("", profiler.file_type) + self.assertEqual( + "", profiler.file_type + ) self.assertIsNone(profiler.encoding) self.assertIsInstance(profiler._profile, UnstructuredCompiler) def test_merge_profiles(self, *mocks): # can properties update correctly for data - data1 = pd.Series(["this", "is my", "\n\r", "test"]) - data2 = pd.Series(["here\n", "\t ", " ", " is", "\n\r", "more data"]) + data1 = pl.Series(["this", "is my", "\n\r", "test"]) + data2 = pl.Series(["here\n", "\t ", " ", " is", "\n\r", "more data"]) # create profilers with test_utils.mock_timeit(): @@ -3461,8 +3399,8 @@ def test_diff(self, *mocks): }, } - data1 = pd.Series(["this", "is my", "\n\r", "test"]) - data2 = pd.Series(["here\n", "\t ", " ", " is", "\n\r", "more data"]) + data1 = pl.Series(["this", "is my", "\n\r", "test"]) + data2 = pl.Series(["here\n", "\t ", " ", " is", "\n\r", "more data"]) profiler1 = UnstructuredProfiler(data1) profiler2 = UnstructuredProfiler(data2) @@ -3493,7 +3431,7 @@ def test_diff(self, *mocks): json.dumps(diff) def test_get_sample_size(self, *mocks): - data = pd.DataFrame([0] * int(50e3)) + data = pl.DataFrame([0] * int(50e3)) # test data size < min_sample_size = 5000 by default profiler = UnstructuredProfiler(None) @@ -3516,7 +3454,7 @@ def test_get_sample_size(self, *mocks): self.assertEqual(25000, sample_size) def test_clean_data_and_get_base_stats(self, *mocks): - data = pd.Series(["here\n", "\t ", "a", " is", "\n\r", "more data"]) + data = pl.Series(["here\n", "\t ", "a", " is", "\n\r", "more data"]) # needed bc _clean_data_and_get_base_stats is not static # for timeit which wraps this func and uses the class @@ -3529,7 +3467,7 @@ def test_clean_data_and_get_base_stats(self, *mocks): ) # note: bc the sample size is 3, only a subset of the data was sampled - self.assertTrue(np.issubdtype(np.object_, df_series.dtype)) + self.assertTrue(pl.String == df_series.dtype) self.assertDictEqual( { "sample": ["more data"], # bc of subset sampled @@ -3546,7 +3484,7 @@ def test_clean_data_and_get_base_stats(self, *mocks): ) # note: bc the sample size is 3, only a subset of the data was sampled - self.assertTrue(np.issubdtype(np.object_, df_series.dtype)) + self.assertTrue(pl.String == df_series.dtype) self.assertDictEqual( { "sample": ["more data", "here\n", "a", " is"], @@ -3559,8 +3497,8 @@ def test_clean_data_and_get_base_stats(self, *mocks): def test_update_profile(self, *mocks): # can properties update correctly for data - data1 = pd.Series(["this", "is my", "\n\r", "test"]) - data2 = pd.Series(["here\n", "\t ", " ", " is", "\n\r", "more data"]) + data1 = pl.Series(["this", "is my", "\n\r", "test"]) + data2 = pl.Series(["here\n", "\t ", " ", " is", "\n\r", "more data"]) # profiler with first dataset with test_utils.mock_timeit(): @@ -3587,7 +3525,7 @@ def test_update_profile(self, *mocks): "_update_profile_from_chunk" ) def test_min_true_samples(self, *mocks): - empty_df = pd.DataFrame([]) + empty_df = pl.DataFrame([]) # Test invalid input msg = "`min_true_samples` must be an integer or `None`." @@ -3628,7 +3566,7 @@ def test_load_from_dict(self, *mocks): @mock.patch("builtins.open") def test_save_json_file(self, *mocks): - data = pd.Series(["this", "is my", "\n\r", "test"]) + data = pl.Series(["this", "is my", "\n\r", "test"]) save_profile = UnstructuredProfiler(data) with self.assertRaisesRegex( @@ -3637,7 +3575,7 @@ def test_save_json_file(self, *mocks): save_profile.save(save_method="json") def test_save_value_error(self, *mocks): - data = pd.Series(["this", "is my", "\n\r", "test"]) + data = pl.Series(["this", "is my", "\n\r", "test"]) save_profile = UnstructuredProfiler(data) # Save and Load profile with Mock IO @@ -3676,7 +3614,7 @@ def setUpClass(cls): "Share", "Report", ] - cls.dataset = pd.DataFrame(cls.input_data) + cls.dataset = pl.DataFrame(cls.input_data) # turn off data labeler because if model changes, results also change profiler_options = ProfilerOptions() @@ -3687,7 +3625,7 @@ def setUpClass(cls): cls.dataset, len(cls.dataset), options=profiler_options ) cls.profiler2 = UnstructuredProfiler( - pd.DataFrame(["extra", "\n", "test\n", "data .", "For merging."]), + pl.DataFrame(["extra", "\n", "test\n", "data .", "For merging."]), options=profiler_options, ) cls.report = cls.profiler.report() @@ -3872,7 +3810,7 @@ def test_text_profiler_results(self): "samples_used": 16, "empty_line_count": 7, "memory_size": 393 / 1024**2, - "file_type": "", + "file_type": "", "encoding": None, "times": {"clean_and_base_stats": 1}, }, @@ -3993,7 +3931,7 @@ def test_update_profile(self): # update profiler and get report update_profiler = UnstructuredProfiler(self.dataset, options=profiler_options) update_profiler.update_profile( - pd.DataFrame(["extra", "\n", "test\n", "data .", "For merging."]) + pl.DataFrame(["extra", "\n", "test\n", "data .", "For merging."]) ) report = update_profiler.report() @@ -4160,8 +4098,8 @@ def test_save_and_load_pkl_file(self): self.assertEqual(save_sample, load_sample) # validate both are still usable after - save_profile.update_profile(pd.DataFrame(["test", "test2"])) - load_profile.update_profile(pd.DataFrame(["test", "test2"])) + save_profile.update_profile(pl.DataFrame(["test", "test2"])) + load_profile.update_profile(pl.DataFrame(["test", "test2"])) def test_save_and_load_no_labeler(self): @@ -4198,8 +4136,8 @@ def test_save_and_load_no_labeler(self): self.assertEqual(save_sample, load_sample) # validate both are still usable after - save_profile.update_profile(pd.DataFrame(["test", "test2"])) - load_profile.update_profile(pd.DataFrame(["test", "test2"])) + save_profile.update_profile(pl.DataFrame(["test", "test2"])) + load_profile.update_profile(pl.DataFrame(["test", "test2"])) def test_options_ingested_correctly(self): self.assertIsInstance(self.profiler.options, UnstructuredOptions) @@ -4234,7 +4172,7 @@ def setUpClass(cls): "tf_null": [None, 1, None, 2] * 5, } - cls.data = pd.DataFrame(data) + cls.data = pl.DataFrame(data) profiler_options_hll = ProfilerOptions() profiler_options_hll.set( @@ -4279,7 +4217,7 @@ def test_adding_profiles_of_mismatched_null_count_options(self): "row_statistics.null_count.is_enabled": False, } ) - data = pd.DataFrame([1, None, 3, 4, 5, None, 1]) + data = pl.DataFrame([1, None, 3, 4, 5, None, 1]) with test_utils.mock_timeit(): profiler_w_null_count = dp.StructuredProfiler( data[:2], options=profiler_options_null_count @@ -4304,7 +4242,7 @@ def test_profile_null_count_not_enabled(self): "row_statistics.null_count.is_enabled": False, } ) - data = pd.DataFrame([1, None, 3, 4, 5, None, 1]) + data = pl.DataFrame([1, None, 3, 4, 5, None, 1]) with test_utils.mock_timeit(): profiler_w_disabled_null_count = dp.StructuredProfiler( data[2:], options=profiler_options_null_disabled @@ -4316,9 +4254,9 @@ def test_profile_null_count_not_enabled(self): def test_correct_rows_ingested(self): test_dict = { "1": ["nan", "null", None, None, ""], - 1: ["nan", "None", "null", None, ""], + "1": ["nan", "None", "null", None, ""], } - test_dataset = pd.DataFrame(data=test_dict) + test_dataset = pl.DataFrame(data=test_dict) profiler_options = ProfilerOptions() profiler_options.set( { @@ -4351,7 +4289,7 @@ def test_correct_rows_ingested(self): def test_correct_null_row_counts(self): file_path = os.path.join(test_root_path, "data", "csv/empty_rows.txt") - data = pd.read_csv(file_path) + data = pl.read_csv(file_path) profiler_options = ProfilerOptions() profiler_options.set( { @@ -4366,7 +4304,7 @@ def test_correct_null_row_counts(self): self.assertEqual(0.25, profile._get_row_is_null_ratio()) file_path = os.path.join(test_root_path, "data", "csv/iris-with-null-rows.csv") - data = pd.read_csv(file_path) + data = pl.read_csv(file_path) profile = dp.StructuredProfiler(data, options=profiler_options) self.assertEqual(13, profile.row_has_null_count) self.assertEqual(13 / 24, profile._get_row_has_null_ratio()) @@ -4378,7 +4316,7 @@ def test_row_is_null_ratio_row_stats_disabled(self): profiler_options_1.set( {"*.is_enabled": False, "row_statistics.null_count.is_enabled": False} ) - profiler = StructuredProfiler(pd.DataFrame([]), options=profiler_options_1) + profiler = StructuredProfiler(pl.DataFrame([]), options=profiler_options_1) self.assertIsNone(profiler._get_row_is_null_ratio()) def test_row_has_null_ratio_row_stats_disabled(self): @@ -4388,7 +4326,7 @@ def test_row_has_null_ratio_row_stats_disabled(self): "*.is_enabled": False, } ) - profiler = StructuredProfiler(pd.DataFrame([]), options=profiler_options_1) + profiler = StructuredProfiler(pl.DataFrame([]), options=profiler_options_1) self.assertIsNone(profiler._get_row_has_null_ratio()) def test_null_in_file(self): @@ -4430,7 +4368,7 @@ def test_correct_total_sample_size_and_counts_and_mutability(self): [None, None], ["test7", 7.0], ] - data = pd.DataFrame(data, columns=["NAME", "VALUE"]) + data = pl.DataFrame(data, schema=["NAME", "VALUE"]) profiler_options = ProfilerOptions() profiler_options.set( { @@ -4475,7 +4413,7 @@ def test_null_calculation_with_differently_sampled_cols(self): "row_statistics.is_enabled": True, } ) - data = pd.DataFrame( + data = pl.DataFrame( { "full": [1, 2, 3, 4, 5, 6, 7, 8, 9], "sparse": [1, None, 3, None, 5, None, 7, None, 9], @@ -4495,7 +4433,7 @@ def test_null_calculation_with_differently_sampled_cols(self): self.assertEqual(0, profile._get_row_is_null_ratio()) self.assertEqual(0.4, profile._get_row_has_null_ratio()) - data2 = pd.DataFrame( + data2 = pl.DataFrame( { "sparse": [1, None, 3, None, 5, None, 7, None], "sparser": [1, None, None, None, None, None, None, 8], @@ -4515,8 +4453,8 @@ def test_null_calculation_with_differently_sampled_cols(self): self.assertEqual(1, profile2._get_row_has_null_ratio()) def test_null_row_stats_correct_after_updates(self, *mocks): - data1 = pd.DataFrame([[1, None], [1, 1], [None, None], [None, 1]]) - data2 = pd.DataFrame([[None, None], [1, None], [None, None], [None, 1]]) + data1 = pl.DataFrame([[1, None], [1, 1], [None, None], [None, 1]]) + data2 = pl.DataFrame([[None, None], [1, None], [None, None], [None, 1]]) opts = ProfilerOptions() opts.set( { @@ -4581,7 +4519,7 @@ def test_null_row_stats_correct_after_updates(self, *mocks): ) # Test that update with emtpy data doesn't change stats - profile.update_profile(pd.DataFrame([])) + profile.update_profile(pl.DataFrame([])) self.assertEqual(7, profile.row_has_null_count) self.assertEqual(3, profile.row_is_null_count) self.assertEqual(0.875, profile._get_row_has_null_ratio()) @@ -4595,7 +4533,7 @@ def test_null_row_stats_correct_after_updates(self, *mocks): ) # Test one row update - profile.update_profile(pd.DataFrame([[1, None]])) + profile.update_profile(pl.DataFrame([[1, None]])) self.assertEqual(8, profile.row_has_null_count) self.assertEqual(3, profile.row_is_null_count) self.assertEqual(8 / 9, profile._get_row_has_null_ratio()) @@ -4625,7 +4563,7 @@ def test_null_row_stats_correct_after_updates(self, *mocks): def test_list_data_with_hll(self): - data = pd.DataFrame( + data = pl.DataFrame( {"a": [1, 1, 4, 4, 3, 1, None], "b": [1, None, 3, 4, 4, None, 1]} ) # test hll_row_hashing @@ -4644,7 +4582,7 @@ def test_list_data_with_hll(self): self.assertEqual(6, profiler.hashed_row_object.cardinality()) def test_add_profilers_row_statistics_options(self): - data = pd.DataFrame([1, None, 3, 4, 5, None, 1]) + data = pl.DataFrame([1, None, 3, 4, 5, None, 1]) default_options = ProfilerOptions() default_options.set( @@ -4827,7 +4765,7 @@ def test_unique_row_ratio_unique_count_disabled(self): "row_statistics.unique_count.is_enabled": False, } ) - profiler = StructuredProfiler(pd.DataFrame([]), options=profiler_options) + profiler = StructuredProfiler(pl.DataFrame([]), options=profiler_options) self.assertIsNone(profiler._get_unique_row_ratio()) def test_unique_row_ratio_empty_profiler(self): @@ -4838,7 +4776,7 @@ def test_unique_row_ratio_empty_profiler(self): "row_statistics.is_enabled": True, } ) - profiler = StructuredProfiler(pd.DataFrame([]), options=profiler_options) + profiler = StructuredProfiler(pl.DataFrame([]), options=profiler_options) self.assertEqual(0, profiler._get_unique_row_ratio()) def test_null_count_empty_profiler(self): @@ -4849,7 +4787,7 @@ def test_null_count_empty_profiler(self): "row_statistics.null_count.is_enabled": False, } ) - profiler = StructuredProfiler(pd.DataFrame([]), options=profiler_options) + profiler = StructuredProfiler(pl.DataFrame([]), options=profiler_options) self.assertIsNone(profiler._get_row_is_null_ratio()) self.assertIsNone(profiler._get_row_has_null_ratio()) @@ -4872,7 +4810,7 @@ def test_duplicate_row_count_unique_count_disabled(self): "row_statistics.unique_count.is_enabled": False, } ) - profiler = StructuredProfiler(pd.DataFrame([]), options=profiler_options_1) + profiler = StructuredProfiler(pl.DataFrame([]), options=profiler_options_1) self.assertIsNone(profiler._get_duplicate_row_count()) def test_duplicate_row_count_empty_profiler(self): @@ -4883,7 +4821,7 @@ def test_duplicate_row_count_empty_profiler(self): "row_statistics.is_enabled": True, } ) - profiler = StructuredProfiler(pd.DataFrame([]), options=profiler_options) + profiler = StructuredProfiler(pl.DataFrame([]), options=profiler_options) self.assertEqual(0, profiler._get_duplicate_row_count()) def test_duplicate_row_count_hll_cardinality_greater_than_total_samples(self): @@ -4901,12 +4839,13 @@ def test_duplicate_row_count_hll_cardinality_greater_than_total_samples(self): spec=dataprofiler.profilers.profile_builder.HyperLogLog, ) as hll_mock: hll_mock.return_value.cardinality.return_value = 1000 - profiler = StructuredProfiler(pd.DataFrame([]), options=profiler_options) + profiler = StructuredProfiler(pl.DataFrame([]), options=profiler_options) self.assertEqual(1000, profiler.hashed_row_object.cardinality()) self.assertEqual(0, profiler._get_duplicate_row_count()) def test_save_and_load_hll(self): + print(self.trained_schema_hll.hashed_row_object) self.assertEqual(15, self.trained_schema_hll.hashed_row_object.cardinality()) # Save and Load profile with Mock IO @@ -4935,7 +4874,7 @@ def test_save_and_load_hll(self): # validate both are still usable after # first row should be unique, second should be duplicate self.trained_schema_hll.update_profile( - pd.DataFrame( + pl.DataFrame( { "names": ["hello", "orange"], "numbers": [5, 1], @@ -4944,7 +4883,7 @@ def test_save_and_load_hll(self): ) ) load_profile.update_profile( - pd.DataFrame( + pl.DataFrame( { "names": ["hello", "orange"], "numbers": [5, 1], @@ -4963,12 +4902,12 @@ def test_profiler_factory_class_bad_input(self): ValueError, "Must specify 'profiler_type' to be 'graph', 'structured' or 'unstructured'.", ): - Profiler(pd.DataFrame([]), profiler_type="whoops") + Profiler(pl.DataFrame([]), profiler_type="whoops") with self.assertRaisesRegex( ValueError, "Data must either be imported using the " - "data_readers, nx.Graph, pd.Series, or pd.DataFrame.", + "data_readers, nx.Graph, pl.Series, or pl.DataFrame.", ): Profiler({"test": 1}) @@ -4986,7 +4925,7 @@ def test_profiler_factory_class_creates_correct_profiler(self, *mocks): reasonable inference in the absence of user specificity. """ # User specifies via profiler_type - data_df = pd.DataFrame(["test"]) + data_df = pl.DataFrame(["test"]) data_graph = nx.Graph() self.assertIsInstance( @@ -5009,8 +4948,8 @@ def test_profiler_factory_class_creates_correct_profiler(self, *mocks): ) self.assertIsInstance(Profiler(data_csv_rec), UnstructuredProfiler) - # user gives structured: list, pd.Series, pd.DataFrame - data_series = pd.Series(["test"]) + # user gives structured: list, pl.Series, pl.DataFrame + data_series = pl.Series(["test"]) data_list = ["test"] self.assertIsInstance(Profiler(data_list), StructuredProfiler) self.assertIsInstance(Profiler(data_series), StructuredProfiler) @@ -5134,8 +5073,8 @@ def test_save_and_load_unstructured(self): self.assertDictEqual(save_report, load_report) # validate both are still usable after - save_profile.update_profile(pd.DataFrame(["test", "test2"])) - load_profile.update_profile(pd.DataFrame(["test", "test2"])) + save_profile.update_profile(pl.DataFrame(["test", "test2"])) + load_profile.update_profile(pl.DataFrame(["test", "test2"])) @mock.patch( "dataprofiler.profilers.profile_builder.StructuredProfiler." @@ -5143,7 +5082,7 @@ def test_save_and_load_unstructured(self): ) @mock.patch("dataprofiler.profilers.profile_builder.DataLabeler") def test_min_true_samples(self, *mocks): - empty_df = pd.DataFrame([]) + empty_df = pl.DataFrame([]) # Test invalid input msg = "`min_true_samples` must be an integer or `None`."