Skip to content

Commit

Permalink
incorporated remove_duplicates
Browse files Browse the repository at this point in the history
  • Loading branch information
Pawween committed Oct 17, 2024
1 parent d55bfb5 commit 5141f0d
Showing 1 changed file with 48 additions and 18 deletions.
66 changes: 48 additions & 18 deletions stages/python/01_get_openaccess.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def process_file(file_info):
outfile = f"url_{group:02d}.parquet"
outpath = raw_path / outfile

df = pd.read_json(filename, lines=True, chunksize=50000000)
df = pd.read_json(filename, lines=True, chunksize=10000000)

for chunk in df:
chunk['publication_date'] = pd.to_datetime(chunk['publication_date'])
Expand All @@ -57,7 +57,7 @@ def process_file(file_info):
chunk['is_oa'] = chunk['open_access'].apply(lambda x: x.get('is_oa') if isinstance(x, dict) else None)

# extract journal name, authors, topic areas and themes, and keywords
chunk['journal_name'] = chunk['primary_location'].apply(lambda x: x.get('source', {}).get('display_name') if isinstance(x, dict) else None)
#chunk['journal_name'] = chunk['primary_location'].apply(lambda x: x.get('source', {}).get('display_name') if isinstance(x, dict) else None)
chunk['authors'] = chunk['authorships'].apply(lambda x: ', '.join([author['author']['display_name'] for author in x]) if isinstance(x, list) else None)
chunk['themes'] = chunk['topics'].apply(lambda x: ', '.join([topic['display_name'] for topic in x]) if isinstance(x, list) else None)
chunk['keywd'] = chunk['keywords'].apply(lambda x: ', '.join([keyword['display_name'] for keyword in x]) if isinstance(x, list) else None)
Expand All @@ -67,29 +67,25 @@ def process_file(file_info):
for col in ['volume', 'issue']:
chunk[col] = chunk['biblio'].apply(lambda x: x.get(col) if isinstance(x, dict) else None)

# metadata features
selected_columns = [
'id', 'doi', 'url','type', 'type_crossref', 'publication_date', 'title',
'is_oa', 'authors', 'areas', 'themes', 'keywd', 'volume', 'issue', 'language'
]

# filtering condition
conditions = (chunk[(chunk['url'].notna()) & (chunk['url'] != 'null') &
filtered_chunk = chunk[(chunk['url'].notna()) & (chunk['url'] != 'null') &
(chunk['publication_date'] >= pd.to_datetime(last_processed_date)) &
(chunk['doi'].str.contains('doi.org', case=False, na=False)) &
(chunk['title'].notna()) & (chunk['title'] != 'null') &
(chunk['authors'].notna()) & (chunk['authors'] != 'null') &
(chunk['is_retracted'] == False) &
(chunk['is_paratext'] == False))
(chunk['is_retracted'] == False) & (chunk['is_paratext'] == False)][selected_columns]

# metadata features
selected_columns = [
'id', 'doi', 'url','type', 'type_crossref',
'publication_date', 'journal_name', 'title',
'is_oa', 'authors', 'areas', 'themes', 'keywd', 'volume',
'issue', 'language'
]

filtered_chunk = chunk[conditions][selected_columns]

filtered_chunk = filtered_chunk.drop_duplicates(subset='doi', keep='first')

#noticed a couple files ran into errors right at the beginning
#problems solved after overwritting them (nothing happened afterward)
#noticed a couple files ran into errors, probably corrupted, right when starting the run.
#problems solved after overwritting them (nothing happened afterward, looked like it was stuck only in the beginning)
if outpath.exists():
try:
filtered_chunk.to_parquet(outpath, engine='fastparquet', compression='snappy', append=True, index=False)
Expand All @@ -107,8 +103,38 @@ def save_last_processed_date(processed_date):
file.write(processed_date.strftime("%Y-%m-%d"))


# Check the output parquet files and removed duplicates


#### Execution ######
def remove_duplicates(parquet_dir):
parquet_dir = Path(parquet_dir)
parquet_files = parquet_dir.glob('*.parquet')

seen_dois = set()

# Iterate through all parquet files
for file in parquet_files:
df = pd.read_parquet(file)

# Check for duplicates
unique_dois = []

for doi in df['doi']:
if doi not in seen_dois:
unique_dois.append(doi)
seen_dois.add(doi)

df_unique = df[df['doi'].isin(unique_dois)]

# drop duplicates
df_drop = df_unique.drop_duplicates(subset='doi', keep='first')

df_drop.to_parquet(file, index=False)

print("\nDuplicate doi have been removed.")



# retrieve last processed date or its arbitrary date
last_processed_date = get_last_processed_date()
Expand All @@ -123,7 +149,7 @@ def save_last_processed_date(processed_date):

# Filter files by last processed date
filtered_files = [(key, date) for key, date in s3_run('openalex', 'data/works/') if date >= last_processed_date]
#data = [(key, date) for key, date in s3_run('openalex', 'data/works/')]


# Process files using multiprocessors
with ProcessPoolExecutor(max_workers=numfile) as executor:
Expand All @@ -143,4 +169,8 @@ def save_last_processed_date(processed_date):
print("No new data processed.")
new_processed_date = last_processed_date

save_last_processed_date(new_processed_date)
save_last_processed_date(new_processed_date)


# Check the output parquet files and removed duplicates
remove_duplicates(raw_path)

0 comments on commit 5141f0d

Please sign in to comment.