diff --git a/deepsearch/cps/data_indices/utils.py b/deepsearch/cps/data_indices/utils.py index 77a25451..94233bc3 100644 --- a/deepsearch/cps/data_indices/utils.py +++ b/deepsearch/cps/data_indices/utils.py @@ -27,6 +27,7 @@ def upload_files( local_file: Optional[Union[str, Path]] = None, s3_coordinates: Optional[S3Coordinates] = None, conv_settings: Optional[ConversionSettings] = None, + url_chunk_size: int = 1, ): """ Orchestrate document conversion and upload to an index in a project @@ -43,7 +44,9 @@ def upload_files( else: urls = url - return process_url_input(api=api, coords=coords, urls=urls) + return process_url_input( + api=api, coords=coords, urls=urls, url_chunk_size=url_chunk_size + ) elif url is None and local_file is not None and s3_coordinates is None: return process_local_file( api=api, @@ -64,18 +67,22 @@ def process_url_input( api: CpsApi, coords: ElasticProjectDataCollectionSource, urls: List[str], + url_chunk_size: int, progress_bar: bool = False, ): """ Individual urls are uploaded for conversion and storage in data index. """ + chunk_list = lambda lst, n: [lst[i : i + n] for i in range(0, len(lst), n)] + root_dir = create_root_dir() # container list for task_ids task_ids = [] # submit urls - count_urls = len(urls) + url_chunks = chunk_list(urls, url_chunk_size) + count_urls = len(url_chunks) with tqdm( total=count_urls, desc=f"{'Submitting input:': <{progressbar.padding}}", @@ -83,8 +90,9 @@ def process_url_input( colour=progressbar.colour, bar_format=progressbar.bar_format, ) as progress: - for url in urls: - file_url_array = [url] + + for url_chunk in url_chunks: + file_url_array = url_chunk payload = {"file_url": file_url_array} task_id = api.data_indices.upload_file(coords=coords, body=payload) task_ids.append(task_id)