Skip to content

Commit

Permalink
modified
Browse files Browse the repository at this point in the history
  • Loading branch information
Pawween committed Oct 14, 2024
1 parent 02cc6a4 commit 6b9c5ff
Showing 1 changed file with 9 additions and 11 deletions.
20 changes: 9 additions & 11 deletions stages/python/01_get_openaccess.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,16 @@ def process_file(file_info):
for chunk in df:
chunk['publication_date'] = pd.to_datetime(chunk['publication_date'])
chunk['url'] = chunk['best_oa_location'].apply(lambda x: x.get('pdf_url') if isinstance(x, dict) else None)

chunk['url'] = chunk['url'].str.split(',').str[0].str.strip()

filtered_chunk = chunk[(chunk['url'].notna()) & (chunk['url'] != 'null') &
(chunk['publication_date'] >= pd.to_datetime(last_processed_date)) &
(chunk['doi'].str.contains('doi.org', case=False, na=False))][['doi', 'url', 'publication_date']]


#result_df = pd.concat([result_df, filtered_chunk], ignore_index=True)
#total_size += filtered_chunk.memory_usage(deep=True).sum()
filtered_chunk = filtered_chunk.drop_duplicates(subset='doi', keep='first')
#filtered_chunk = filtered_chunk.drop_duplicates(subset='title', keep='first')


if outpath.exists():
filtered_chunk.to_parquet(outpath, engine='fastparquet', compression='snappy', append=True)
Expand All @@ -92,15 +93,11 @@ def process_file(file_info):


# Calculate the new processed date
all_parquet_files = list(raw_path.glob('group_*.parquet'))
all_parquet_files = list(raw_path.glob('*.parquet'))
if all_parquet_files:
max_date = None
for file in all_parquet_files:
df = pd.read_parquet(file, columns=['publication_date'])
file_max_date = df['publication_date'].max()
if max_date is None or file_max_date > max_date:
max_date = file_max_date
new_processed_date = max_date.date() if max_date else last_processed_date
df_list = [pd.read_parquet(file) for file in all_parquet_files]
combined_df = pd.concat(df_list, ignore_index=True)
new_processed_date = combined_df['publication_date'].max().date()
save_last_processed_date(new_processed_date)
print(f"Processed publications from {last_processed_date} to: {new_processed_date}")
else:
Expand All @@ -109,6 +106,7 @@ def process_file(file_info):

save_last_processed_date(new_processed_date)





Expand Down

0 comments on commit 6b9c5ff

Please sign in to comment.