diff --git a/clouddrift/adapters/mosaic.py b/clouddrift/adapters/mosaic.py index 2eb2935f..940f1c0a 100644 --- a/clouddrift/adapters/mosaic.py +++ b/clouddrift/adapters/mosaic.py @@ -59,10 +59,11 @@ def get_dataframes() -> tuple[pd.DataFrame, pd.DataFrame]: range(len(sensor_ids)), key=lambda k: order_index[sensor_ids[k]] ) sorted_data_urls = [data_urls[i] for i in sorted_indices] - buffers = [BytesIO(b"") for _ in range(len(sorted_data_urls))] - requests = [(url, BytesIO(b""), None) for url in sorted_data_urls] + buffers = [BytesIO() for _ in range(len(sorted_data_urls))] + requests = [(url, buffer, None) for url, buffer in zip(sorted_data_urls, buffers)] download_with_progress(requests, desc="Downloading data") + [b.seek(0) for b in buffers] dfs = [pd.read_csv(b) for b in buffers] obs_df = pd.concat(dfs) diff --git a/clouddrift/adapters/utils.py b/clouddrift/adapters/utils.py index 6193a2ef..ee5e5013 100644 --- a/clouddrift/adapters/utils.py +++ b/clouddrift/adapters/utils.py @@ -1,14 +1,14 @@ import concurrent.futures import logging import os -import traceback -from collections.abc import Callable, Sequence from datetime import datetime -from io import BufferedIOBase, StringIO +from io import BufferedIOBase +from typing import Callable, Sequence import requests from requests import Response from tenacity import ( + RetryCallState, WrappedFn, retry, retry_if_exception, @@ -17,17 +17,27 @@ ) from tqdm import tqdm + +def _before_call(rcs: RetryCallState): + if rcs.attempt_number > 1: + src = rcs.args[0] + dst = "io-buffer" if isinstance(rcs.args[1], BufferedIOBase) else rcs.args[1] + _logger.warn( + f"retrying download request for (dst, src): {(src, dst)}, attempt: {rcs.attempt_number}" + ) + + _CHUNK_SIZE = 1024 _logger = logging.getLogger(__name__) _standard_retry_protocol: Callable[[WrappedFn], WrappedFn] = retry( retry=retry_if_exception( lambda ex: isinstance(ex, (requests.Timeout, requests.HTTPError)) ), - wait=wait_exponential_jitter(initial=1.25), # ~ 20-25 minutes total time before completely failing + wait=wait_exponential_jitter( + initial=1.25 + ), # ~ 20-25 minutes total time before completely failing stop=stop_after_attempt(10), - before=lambda rcs: _logger.debug( - f"calling {str(rcs.fn)}, attempt: {rcs.attempt_number}" - ), + before=_before_call, ) @@ -150,10 +160,7 @@ def _download_with_progress( except Exception as e: force_close = True error_msg = f"Error downloading data file: {url} to: {output}, error: {e}" - _logger.error(error_msg) - string_buffer = StringIO("") - traceback.print_tb(e.__traceback__, file=string_buffer) - _logger.debug(f"tracing details: <<<{string_buffer.getvalue()}>>>") + _logger.debug(error_msg) raise e finally: if response is not None: diff --git a/tests/datasets_tests.py b/tests/datasets_tests.py index ce6f16b8..cc24cb0f 100644 --- a/tests/datasets_tests.py +++ b/tests/datasets_tests.py @@ -58,3 +58,7 @@ def test_andro_opens(self): def test_yomaha_opens(self): with datasets.yomaha() as ds: self.assertTrue(ds) + + def test_mosaic_opens(self): + with datasets.mosaic() as ds: + self.assertTrue(ds)