Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] Enable control over chunking of URL lists #168

Merged
merged 1 commit into from
Feb 29, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions deepsearch/cps/data_indices/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def upload_files(
local_file: Optional[Union[str, Path]] = None,
s3_coordinates: Optional[S3Coordinates] = None,
conv_settings: Optional[ConversionSettings] = None,
url_chunk_size: int = 1,
):
"""
Orchestrate document conversion and upload to an index in a project
Expand All @@ -43,7 +44,9 @@ def upload_files(
else:
urls = url

return process_url_input(api=api, coords=coords, urls=urls)
return process_url_input(
api=api, coords=coords, urls=urls, url_chunk_size=url_chunk_size
)
elif url is None and local_file is not None and s3_coordinates is None:
return process_local_file(
api=api,
Expand All @@ -64,27 +67,32 @@ def process_url_input(
api: CpsApi,
coords: ElasticProjectDataCollectionSource,
urls: List[str],
url_chunk_size: int,
progress_bar: bool = False,
):
"""
Individual urls are uploaded for conversion and storage in data index.
"""

chunk_list = lambda lst, n: [lst[i : i + n] for i in range(0, len(lst), n)]

root_dir = create_root_dir()

# container list for task_ids
task_ids = []
# submit urls
count_urls = len(urls)
url_chunks = chunk_list(urls, url_chunk_size)
count_urls = len(url_chunks)
with tqdm(
total=count_urls,
desc=f"{'Submitting input:': <{progressbar.padding}}",
disable=not (progress_bar),
colour=progressbar.colour,
bar_format=progressbar.bar_format,
) as progress:
for url in urls:
file_url_array = [url]

for url_chunk in url_chunks:
file_url_array = url_chunk
payload = {"file_url": file_url_array}
task_id = api.data_indices.upload_file(coords=coords, body=payload)
task_ids.append(task_id)
Expand Down
Loading