Skip to content

Commit

Permalink
🐛 fix mosaic dataset bug, using empty buffers (Cloud-Drift#389)
Browse files Browse the repository at this point in the history
* fix mosaic dataset bug, using empty buffers
  • Loading branch information
kevinsantana11 authored Apr 11, 2024
1 parent c488f0d commit 383f83d
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 13 deletions.
5 changes: 3 additions & 2 deletions clouddrift/adapters/mosaic.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,11 @@ def get_dataframes() -> tuple[pd.DataFrame, pd.DataFrame]:
range(len(sensor_ids)), key=lambda k: order_index[sensor_ids[k]]
)
sorted_data_urls = [data_urls[i] for i in sorted_indices]
buffers = [BytesIO(b"") for _ in range(len(sorted_data_urls))]
requests = [(url, BytesIO(b""), None) for url in sorted_data_urls]
buffers = [BytesIO() for _ in range(len(sorted_data_urls))]
requests = [(url, buffer, None) for url, buffer in zip(sorted_data_urls, buffers)]

download_with_progress(requests, desc="Downloading data")
[b.seek(0) for b in buffers]
dfs = [pd.read_csv(b) for b in buffers]
obs_df = pd.concat(dfs)

Expand Down
29 changes: 18 additions & 11 deletions clouddrift/adapters/utils.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import concurrent.futures
import logging
import os
import traceback
from collections.abc import Callable, Sequence
from datetime import datetime
from io import BufferedIOBase, StringIO
from io import BufferedIOBase
from typing import Callable, Sequence

import requests
from requests import Response
from tenacity import (
RetryCallState,
WrappedFn,
retry,
retry_if_exception,
Expand All @@ -17,17 +17,27 @@
)
from tqdm import tqdm


def _before_call(rcs: RetryCallState):
if rcs.attempt_number > 1:
src = rcs.args[0]
dst = "io-buffer" if isinstance(rcs.args[1], BufferedIOBase) else rcs.args[1]
_logger.warn(
f"retrying download request for (dst, src): {(src, dst)}, attempt: {rcs.attempt_number}"
)


_CHUNK_SIZE = 1024
_logger = logging.getLogger(__name__)
_standard_retry_protocol: Callable[[WrappedFn], WrappedFn] = retry(
retry=retry_if_exception(
lambda ex: isinstance(ex, (requests.Timeout, requests.HTTPError))
),
wait=wait_exponential_jitter(initial=1.25), # ~ 20-25 minutes total time before completely failing
wait=wait_exponential_jitter(
initial=1.25
), # ~ 20-25 minutes total time before completely failing
stop=stop_after_attempt(10),
before=lambda rcs: _logger.debug(
f"calling {str(rcs.fn)}, attempt: {rcs.attempt_number}"
),
before=_before_call,
)


Expand Down Expand Up @@ -150,10 +160,7 @@ def _download_with_progress(
except Exception as e:
force_close = True
error_msg = f"Error downloading data file: {url} to: {output}, error: {e}"
_logger.error(error_msg)
string_buffer = StringIO("")
traceback.print_tb(e.__traceback__, file=string_buffer)
_logger.debug(f"tracing details: <<<{string_buffer.getvalue()}>>>")
_logger.debug(error_msg)
raise e
finally:
if response is not None:
Expand Down
4 changes: 4 additions & 0 deletions tests/datasets_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,7 @@ def test_andro_opens(self):
def test_yomaha_opens(self):
with datasets.yomaha() as ds:
self.assertTrue(ds)

def test_mosaic_opens(self):
with datasets.mosaic() as ds:
self.assertTrue(ds)

0 comments on commit 383f83d

Please sign in to comment.