From 6b9c5ff0d6e4623f615a0b460d1cf5aa2bea686e Mon Sep 17 00:00:00 2001 From: Pawween Date: Mon, 14 Oct 2024 03:12:13 +0000 Subject: [PATCH] modified --- stages/python/01_get_openaccess.py | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/stages/python/01_get_openaccess.py b/stages/python/01_get_openaccess.py index f3645dc..d5605a5 100644 --- a/stages/python/01_get_openaccess.py +++ b/stages/python/01_get_openaccess.py @@ -68,15 +68,16 @@ def process_file(file_info): for chunk in df: chunk['publication_date'] = pd.to_datetime(chunk['publication_date']) chunk['url'] = chunk['best_oa_location'].apply(lambda x: x.get('pdf_url') if isinstance(x, dict) else None) - chunk['url'] = chunk['url'].str.split(',').str[0].str.strip() filtered_chunk = chunk[(chunk['url'].notna()) & (chunk['url'] != 'null') & (chunk['publication_date'] >= pd.to_datetime(last_processed_date)) & (chunk['doi'].str.contains('doi.org', case=False, na=False))][['doi', 'url', 'publication_date']] + - #result_df = pd.concat([result_df, filtered_chunk], ignore_index=True) - #total_size += filtered_chunk.memory_usage(deep=True).sum() + filtered_chunk = filtered_chunk.drop_duplicates(subset='doi', keep='first') + #filtered_chunk = filtered_chunk.drop_duplicates(subset='title', keep='first') + if outpath.exists(): filtered_chunk.to_parquet(outpath, engine='fastparquet', compression='snappy', append=True) @@ -92,15 +93,11 @@ def process_file(file_info): # Calculate the new processed date -all_parquet_files = list(raw_path.glob('group_*.parquet')) +all_parquet_files = list(raw_path.glob('*.parquet')) if all_parquet_files: - max_date = None - for file in all_parquet_files: - df = pd.read_parquet(file, columns=['publication_date']) - file_max_date = df['publication_date'].max() - if max_date is None or file_max_date > max_date: - max_date = file_max_date - new_processed_date = max_date.date() if max_date else last_processed_date + df_list = [pd.read_parquet(file) for file in all_parquet_files] + combined_df = pd.concat(df_list, ignore_index=True) + new_processed_date = combined_df['publication_date'].max().date() save_last_processed_date(new_processed_date) print(f"Processed publications from {last_processed_date} to: {new_processed_date}") else: @@ -109,6 +106,7 @@ def process_file(file_info): save_last_processed_date(new_processed_date) +