Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handle tags with only aggregate datasets #873

Merged
merged 2 commits into from
Oct 10, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 29 additions & 4 deletions alyx/misc/management/commands/one_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,19 +150,22 @@ def generate_tables(self, tables, export_qc=False, **kwargs) -> list:
if table.lower() == 'sessions':
logger.debug('Generating sessions DataFrame')
tbl, filename = self._save_table(generate_sessions_frame(**kwargs), table, dry=dry)
to_compress[filename] = tbl
if filename is not None:
to_compress[filename] = tbl
elif table.lower() == 'datasets':
logger.debug('Generating datasets DataFrame')
tbl, filename = self._save_table(generate_datasets_frame(**kwargs), table, dry=dry)
to_compress[filename] = tbl
if filename is not None:
to_compress[filename] = tbl
else:
raise ValueError(f'Unknown table "{table}"')

if export_qc:
tbl, filename = self._save_qc(dry=dry, tags=kwargs.get('tags'))
to_compress[filename] = tbl
if filename is not None:
to_compress[filename] = tbl

if self.compress:
if self.compress and len(to_compress) > 0:
return list(self._compress_tables(to_compress))
else:
return list(to_compress.keys())
Expand All @@ -178,6 +181,11 @@ def _save_table(self, table, name, **kwargs):
:param dry: If True, does not actually write to disk
:return: A PyArrow table and the full path to the saved file
"""

if table is None:
logger.warning(f'Table {name} is empty, not saving')
return None, None

if not kwargs.get('dry'):
logger.info(f'Saving table "{name}" to {self.dst_dir}...')
scheme = urllib.parse.urlparse(self.dst_dir).scheme or 'file'
Expand All @@ -197,6 +205,12 @@ def _save_qc(self, dry=False, tags=None):
sessions = sessions.filter(data_dataset_session_related__tags__name__in=tags)
else:
sessions = sessions.filter(data_dataset_session_related__tags__name=tags)

if sessions.count() == 0:
logger.warning(f'No datasets associated with sessions found for {tags}, '
f'returning empty dataframe')
return

qc = list(sessions.values('pk', 'qc', 'extended_qc').distinct())
outcome_map = dict(Session.QC_CHOICES)
for d in qc: # replace enumeration int with string
Expand Down Expand Up @@ -327,6 +341,12 @@ def generate_sessions_frame(tags=None) -> pd.DataFrame:
query = query.filter(data_dataset_session_related__tags__name__in=tags)
else:
query = query.filter(data_dataset_session_related__tags__name=tags)

if query.count() == 0:
logger.warning(f'No datasets associated with sessions found for {tags}, '
f'returning empty dataframe')
return

df = pd.DataFrame.from_records(query.values(*fields).distinct())
logger.debug(f'Raw session frame = {getsizeof(df) / 1024**2} MiB')
# Rename, sort fields
Expand Down Expand Up @@ -380,6 +400,11 @@ def generate_datasets_frame(tags=None, batch_size=100_000) -> pd.DataFrame:
ds = ds.annotate(exists_flatiron=Exists(on_flatiron), exists_aws=Exists(on_aws))
ds = ds.filter(Q(exists_flatiron=True) | Q(exists_aws=True), session__isnull=False)

if ds.count() == 0:
logger.warning(f'No datasets associated with sessions found for {tags}, '
f'returning empty dataframe')
return

# fields to keep from Dataset table
fields = (
'id', 'name', 'file_size', 'hash', 'collection', 'revision__name', 'default_dataset',
Expand Down
Loading