Skip to content

Commit

Permalink
removed redundant wrapping functions, cleaned up, incorporated comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mahinth1 committed Oct 23, 2024
1 parent 2c91b1b commit 87dcbdc
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 44 deletions.
17 changes: 5 additions & 12 deletions stages/01_get_openaccess.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import shutil


#####vFunctions #####
#####Functions #####

# Get last processed date if applicable
def get_last_processed_date():
Expand Down Expand Up @@ -82,12 +82,9 @@ def process_file(file_info):
(chunk['authors'].notna()) & (chunk['authors'] != 'null') &
(chunk['is_retracted'] == False) & (chunk['is_paratext'] == False)][selected_columns]


filtered_chunk = filtered_chunk.drop_duplicates(subset='doi', keep='first')


# handling issues with corrupted files and cases when to add new data

temp_outpath = outpath.with_suffix('.tmp')
if outpath.exists():
try:
Expand All @@ -104,11 +101,6 @@ def process_file(file_info):
if temp_outpath.exists():
temp_outpath.rename(outpath)

# Save last processed date into a file
def save_last_processed_date(processed_date):
with open('last_processed_date.txt', 'w') as file:
file.write(processed_date.strftime("%Y-%m-%d"))


# Check the output parquet files and removed duplicates
def remove_duplicates(parquet_dir):
Expand Down Expand Up @@ -167,14 +159,15 @@ def remove_duplicates(parquet_dir):
df_list = [pd.read_parquet(file) for file in all_parquet_files]
combined_df = pd.concat(df_list, ignore_index=True)
new_processed_date = combined_df['publication_date'].max().date()
save_last_processed_date(new_processed_date)

with open('last_processed_date.txt', 'w') as file:
file.write(new_processed_date.strftime("%Y-%m-%d"))

print(f"Processed publications from {last_processed_date} to: {new_processed_date}")
else:
print("No new data processed.")
new_processed_date = last_processed_date

save_last_processed_date(new_processed_date)


# Check the output parquet files and remove duplicates
remove_duplicates(raw_path)
54 changes: 22 additions & 32 deletions stages/02_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,32 +13,8 @@
dotenv.load_dotenv()
scraperapi_key = os.getenv("SCRAPERAPI_KEY")

#scraperapi_key = "your scraper api key"

# define metadata columns or descriptors
metadata_columns = [
'id', 'doi', 'url', 'type', 'type_crossref', 'publication_date', 'journal', 'publisher',
'title', 'is_oa', 'authors', 'areas', 'themes', 'keywd', 'volume', 'issue', 'language',
'content_hash', 'file_path'
]

##### Functions #####

# Load existing metadata from a parquet file if applicable
def load_existing_metadata(metadata_output_dir, file_stem):
return pd.read_parquet(metadata_output_dir / f"{file_stem}_pdfs.parquet")

# Save new metadata to a parquet file
def save_metadata(metadata_df, metadata_output_dir, file_stem):
output_file = metadata_output_dir / f"{file_stem}_pdfs.parquet"
if output_file.exists():
existing_df = pd.read_parquet(output_file)
combined_df = pd.concat([existing_df, metadata_df], ignore_index=True)
combined_df.to_parquet(output_file, index=False)
else:
metadata_df.to_parquet(output_file, index=False)


# download pdf from url
def download_pdf(url, file_output_dir, downloaded_hashes):
response = requests.get(f"http://api.scraperapi.com?api_key={scraperapi_key}&url={url}&render=true")
Expand All @@ -52,6 +28,7 @@ def download_pdf(url, file_output_dir, downloaded_hashes):
if content_hash in downloaded_hashes:
return None, None

# get rid of too small files
if len(content) < 1024:
return None, None

Expand All @@ -63,6 +40,7 @@ def download_pdf(url, file_output_dir, downloaded_hashes):
with outfile_path.open('wb') as file:
file.write(content)

# check if the file is a valid PDF
if content.startswith(b'%PDF-'):
try:
with outfile_path.open('rb') as file:
Expand Down Expand Up @@ -101,13 +79,20 @@ def extract_metadata(doi):

##### Execution #####

# input and output directories
# set input and output directories
input_dir = Path('brick/articles.parquet')
output_dir = Path('brick/pdfs')
output_dir.mkdir(parents=True, exist_ok=True)
metadata_output_dir = Path('brick/download.parquet')
metadata_output_dir.mkdir(parents=True, exist_ok=True)

# define final metadata columns or descriptors
metadata_columns = [
'id', 'doi', 'url', 'type', 'type_crossref', 'publication_date', 'journal', 'publisher',
'title', 'is_oa', 'authors', 'areas', 'themes', 'keywd', 'volume', 'issue', 'language',
'content_hash', 'file_path'
]

# process each file
for file in input_dir.glob('*.parquet'):
# read parquet file (can be adjusted to read only a subset of the file)
Expand All @@ -120,7 +105,8 @@ def extract_metadata(doi):
file_output_dir.mkdir(parents=True, exist_ok=True)

# Get the latest row where content_hash is assigned
existing_metadata = load_existing_metadata(metadata_output_dir, file.stem)
# Start from where it is left off (last downloaded pdf) instead of starting from row 0
existing_metadata = pd.read_parquet(metadata_output_dir / f"{file_stem}_pdfs.parquet")
latest_row = existing_metadata[existing_metadata['content_hash'].notnull()].iloc[-1] if not existing_metadata.empty else None

if latest_row is not None and 'doi' in latest_row:
Expand Down Expand Up @@ -167,6 +153,7 @@ def extract_metadata(doi):
# handle issues when no new data is generated
if not results_df.empty:

# descriptors in the input parquet file
original_columns = [
'id', 'doi','type', 'type_crossref', 'publication_date', 'title',
'is_oa', 'authors', 'areas', 'themes', 'keywd', 'volume', 'issue', 'language'
Expand All @@ -177,18 +164,21 @@ def extract_metadata(doi):
metadata_df = pd.merge(results_df, original_df, on='doi', how='left')

# remove rows with no downloaded pdfs
metadata_df = metadata_df.dropna(subset=['content_hash'])
metadata_df = metadata_df.dropna(subset=['file_path'])
metadata_df = metadata_df.dropna(subset=['content_hash', 'file_path'])

# select columns and remove duplicates
metadata_df = metadata_df[metadata_columns]
metadata_df = metadata_df.drop_duplicates(subset=['id'])
metadata_df = metadata_df.drop_duplicates(subset=['doi'])

# save new metadata
if not metadata_df.empty:
output_file = metadata_output_dir / f"{file_stem}_pdfs.parquet"
save_metadata(metadata_df, file_output_dir, file_stem)
output_file = metadata_output_dir / f"{file_stem}_pdfs.parquet"
# add updated data to existing file and remove duplicates
if output_file.exists():
existing_df = pd.read_parquet(output_file)
combined_df = pd.concat([existing_df, metadata_df], ignore_index=True).drop_duplicates(subset=['doi', 'content_hash'])
combined_df.to_parquet(output_file, index=False)
else:
metadata_df.to_parquet(output_file, index=False)



0 comments on commit 87dcbdc

Please sign in to comment.