Skip to content

Commit

Permalink
feat: streamline config mgmt across components
Browse files Browse the repository at this point in the history
Signed-off-by: Panos Vagenas <[email protected]>
  • Loading branch information
vagenas committed Jul 28, 2023
1 parent 09b9d73 commit 25ffe58
Show file tree
Hide file tree
Showing 18 changed files with 344 additions and 185 deletions.
37 changes: 18 additions & 19 deletions deepsearch/artifacts/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,29 +52,28 @@ print(artf_mgr.get_cache_path())

### Usage with CLI
```console
$ deepsearch artifacts --help
Usage: deepsearch artifacts [OPTIONS] COMMAND [ARGS]...
$ deepsearch artifact --help

Manage artifacts
Usage: deepsearch artifact [OPTIONS] COMMAND [ARGS]...

Options:
--help Show this message and exit.
Manage artifacts

Commands:
download Download an artifact to cache
download-all Download all artifacts to cache
list-cache List artifacts in cache
list-index List artifacts in index
locate-cached-artifact Show path of a cached artifact
locate-default-cache Show cache path
╭─ Options ───────────────────────────────────────────────────────────────────╮
│ --help Show this message and exit. │
╰─────────────────────────────────────────────────────────────────────────────╯
╭─ Commands ──────────────────────────────────────────────────────────────────╮
│ cache Manage artifact caches. │
│ download Download an artifact to cache. │
│ index Manage artifact indices. │
│ locate Show path of a cached artifact. │
╰─────────────────────────────────────────────────────────────────────────────╯
```

### Environment variables
## Configuration
The artifact management facility extends the Toolkit configuration with its own settings.

Environment variables can be used for overriding internal defaults—for the latest status,
check [artifact_manager.py](artifact_manager.py).
For details check [Toolkit Configuration][toolkit_configuration].

- `DEEPSEARCH_ARTIFACT_INDEX`: default index path
- `DEEPSEARCH_ARTIFACT_CACHE`: default cache path
- `DEEPSEARCH_ARTIFACT_META_FILENAME`: name of JSON metadata file
- `DEEPSEARCH_ARTIFACT_URL_FIELD`: field for download URL within JSON metadata file
For example, the index path can be injected via env var `DEEPSEARCH_ARTIFACT_INDEX`.

[toolkit_configuration]: https://ds4sd.github.io/deepsearch-toolkit/guide/configuration/
53 changes: 21 additions & 32 deletions deepsearch/artifacts/artifact_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,21 @@
import os
import shutil
import tempfile
from enum import Enum
from pathlib import Path
from typing import Dict, List
from typing import Dict, List, Optional
from urllib.parse import urlparse

import platformdirs
import requests
from tqdm import tqdm

DFLT_ARTFCT_INDEX_DIR = os.getenv("DEEPSEARCH_ARTIFACT_INDEX", default=os.getcwd())
DFLT_ARTFCT_CACHE_DIR = os.getenv(
"DEEPSEARCH_ARTIFACT_CACHE",
default=Path(platformdirs.user_cache_dir("deepsearch", "ibm")) / "artifact_cache",
)
ARTF_META_FILENAME = os.getenv("DEEPSEARCH_ARTIFACT_META_FILENAME", default="meta.info")
ARTF_META_URL_FIELD = os.getenv("DEEPSEARCH_ARTIFACT_URL_FIELD", default="static_url")
from deepsearch.artifacts.settings import ArtifactSettings


class ArtifactManager:
class HitStrategy(str, Enum):
RAISE = "raise"
PASS = "pass"
OVERWRITE = "overwrite"

def __init__(self, index=None, cache=None):
self._index_path = Path(index or DFLT_ARTFCT_INDEX_DIR)
self._cache_path = Path(cache or DFLT_ARTFCT_CACHE_DIR)
def __init__(self, settings: Optional[ArtifactSettings] = None):
self._settings = settings or ArtifactSettings()
self._index_path = Path(self._settings.index_path)
self._cache_path = Path(self._settings.cache_path)
self._cache_path.mkdir(parents=True, exist_ok=True)

def get_cache_path(self) -> Path:
Expand All @@ -46,47 +34,48 @@ def get_artifact_path_in_cache(self, artifact_name: str) -> Path:
def download_artifact_to_cache(
self,
artifact_name: str,
unpack_archives: bool = True,
hit_strategy: HitStrategy = HitStrategy.OVERWRITE,
with_progress_bar: bool = False,
) -> None:
artifact_path = self._cache_path / artifact_name
if artifact_path.exists():
if hit_strategy == self.HitStrategy.RAISE:
if self._settings.hit_strategy == ArtifactSettings.HitStrategy.RAISE:
raise ValueError(f'Artifact "{artifact_name}" already in cache')
elif hit_strategy == self.HitStrategy.PASS:
elif self._settings.hit_strategy == ArtifactSettings.HitStrategy.PASS:
return
elif hit_strategy == self.HitStrategy.OVERWRITE:
elif self._settings.hit_strategy == ArtifactSettings.HitStrategy.OVERWRITE:
shutil.rmtree(artifact_path)
else:
raise RuntimeError(f'Unexcpected value "{hit_strategy=}"')
raise RuntimeError(
f'Unexcpected value "{self._settings.hit_strategy=}"'
)

artifact_path.mkdir(exist_ok=False)

# read metadata from file
meta_path = self._index_path / artifact_name / ARTF_META_FILENAME
meta_path = self._index_path / artifact_name / self._settings.meta_filename
with open(meta_path, "r") as meta_file:
artifact_meta = json.load(meta_file)
download_url = artifact_meta[ARTF_META_URL_FIELD]
download_url = artifact_meta[self._settings.meta_url_field]

with tempfile.TemporaryDirectory() as temp_dir:
download_path = self._download_file(
artifact_name=artifact_name,
download_url=download_url,
download_root_path=Path(temp_dir),
with_progress_bar=with_progress_bar,
with_progress_bar=self._settings.progress_bar,
)
self._finalize_download(
download_path=download_path,
target_path=artifact_path,
unpack_archives=unpack_archives,
unpack_archives=self._settings.unpack_archives,
)

def get_artifacts_in_index(self) -> List[str]:
artifacts = []
for entry in os.scandir(self._index_path):
artifact_name = entry.name
meta_file_path = self._index_path / artifact_name / ARTF_META_FILENAME
meta_file_path = (
self._index_path / artifact_name / self._settings.meta_filename
)
if meta_file_path.exists():
artifacts.append(artifact_name)
return artifacts
Expand All @@ -96,7 +85,7 @@ def get_artifacts_in_cache(self) -> List[str]:
for entry in os.scandir(self._cache_path):
artifact_name = entry.name
artifact_path = self._cache_path / artifact_name
if artifact_path.exists():
if artifact_path.is_dir():
artifacts.append(artifact_name)
return artifacts

Expand Down Expand Up @@ -173,7 +162,7 @@ def _finalize_download(
shutil.move(dl_path_str, target_path / "")

def _get_artifact_meta(self, artifact_name: str) -> Dict:
file_path = self._index_path / artifact_name / ARTF_META_FILENAME
file_path = self._index_path / artifact_name / self._settings.meta_filename
if not file_path.exists():
raise FileNotFoundError(f'File "{file_path}" does not exist')
with open(file_path, "r") as file:
Expand Down
156 changes: 109 additions & 47 deletions deepsearch/artifacts/cli/main.py
Original file line number Diff line number Diff line change
@@ -1,115 +1,177 @@
from typing import Optional

import typer
from typing_extensions import Annotated

from deepsearch.artifacts.artifact_manager import (
DFLT_ARTFCT_CACHE_DIR,
DFLT_ARTFCT_INDEX_DIR,
ArtifactManager,
from deepsearch.artifacts.artifact_manager import ArtifactManager
from deepsearch.artifacts.settings import (
FALLBACK_CACHE_PATH,
FALLBACK_INDEX_PATH,
ArtifactSettings,
)
from deepsearch.core.cli.utils import cli_handler

index_app = typer.Typer(no_args_is_help=True, add_completion=False)
cache_app = typer.Typer(no_args_is_help=True, add_completion=False)

app = typer.Typer(no_args_is_help=True, add_completion=False)
app.add_typer(index_app, name="index", help="Manage artifact indices.")
app.add_typer(cache_app, name="cache", help="Manage artifact caches.")


def _get_unset_case_help(fallback=None) -> str:
fallback_msg = "" if fallback is None else f' (fallback: "{fallback}")'
return f"If not set, resolved from environment{fallback_msg}."


INDEX_OPTION = typer.Option(
None,
"--index",
"-i",
help="Artifact index path (default set via env var DEEPSEARCH_ARTIFACT_INDEX, else current working dir).",
help=f"Artifact index path. {_get_unset_case_help(FALLBACK_INDEX_PATH)}",
)

CACHE_OPTION = typer.Option(
None,
"--cache",
"-c",
help="Artifact cache path (default set via env var DEEPSEARCH_ARTIFACT_CACHE, else platform-specific).",
help=f"Artifact cache path. {_get_unset_case_help(FALLBACK_CACHE_PATH)}",
)

HIT_STRATEGY_OPTION = typer.Option(
ArtifactManager.HitStrategy.OVERWRITE,
ArtifactSettings.HitStrategy.OVERWRITE,
"--hit-strategy",
"-s",
help="How to handle case of artifact being already in cache.",
help="Controls handling of case artifact being already in cache.",
)

UNPACK_OPTION = typer.Option(
True,
help="Controls archive unpacking.",
)

PROGRESS_BAR_OPTION = typer.Option(
True,
help="Controls progress bar display.",
)


@app.command(help="List artifacts in index")
def _create_settings(
index_path: Optional[str] = None,
cache_path: Optional[str] = None,
hit_strategy: Optional[ArtifactSettings.HitStrategy] = None,
unpack_archives: Optional[bool] = None,
progress_bar: Optional[bool] = None,
) -> ArtifactSettings:
settings = ArtifactSettings()
if index_path is not None:
settings.index_path = index_path
if cache_path is not None:
settings.cache_path = cache_path
if hit_strategy is not None:
settings.hit_strategy = hit_strategy
if unpack_archives is not None:
settings.unpack_archives = unpack_archives
if progress_bar is not None:
settings.progress_bar = progress_bar
return settings


@index_app.command(name="list", help="List artifacts in index.")
@cli_handler()
def list_index(
index: str = INDEX_OPTION,
index_path: Annotated[
Optional[str],
typer.Argument(help=_get_unset_case_help(FALLBACK_INDEX_PATH)),
] = None
):
artf_mgr = ArtifactManager(index=index)
artf_mgr = ArtifactManager(settings=_create_settings(index_path=index_path))
artifacts = artf_mgr.get_artifacts_in_index()
for artf in artifacts:
typer.echo(artf)


@app.command(help="List artifacts in cache")
@cache_app.command(name="list", help="List artifacts in cache.")
@cli_handler()
def list_cache(
cache: str = CACHE_OPTION,
cache_path: Annotated[
Optional[str],
typer.Argument(help=_get_unset_case_help(FALLBACK_CACHE_PATH)),
] = None
):
artf_mgr = ArtifactManager(cache=cache)
artf_mgr = ArtifactManager(settings=_create_settings(cache_path=cache_path))
artifacts = artf_mgr.get_artifacts_in_cache()
for artf in artifacts:
typer.echo(artf)


@app.command(help="Show cache path")
@cache_app.command(name="locate", help="Show default cache path.")
@cli_handler()
def locate_default_cache():
def locate_cache():
artf_mgr = ArtifactManager()
path_str = str(artf_mgr.get_cache_path().resolve())
typer.echo(path_str)


@app.command(help="Show path of a cached artifact")
@app.command(
name="locate", help="Show path of a cached artifact.", no_args_is_help=True
)
@cli_handler()
def locate_cached_artifact(
artifact_name: str,
cache: str = CACHE_OPTION,
):
artf_mgr = ArtifactManager(cache=cache)

artf_mgr = ArtifactManager(settings=_create_settings(cache_path=cache))
artf_path = artf_mgr.get_artifact_path_in_cache(artifact_name=artifact_name)
artifact_path_str = str(artf_path.resolve())
typer.echo(artifact_path_str)


@app.command(help="Download an artifact to cache")
@app.command(
name="download", help="Download an artifact to cache.", no_args_is_help=True
)
@cli_handler()
def download(
artifact_name: str,
index: str = INDEX_OPTION,
cache: str = CACHE_OPTION,
hit_strategy: ArtifactManager.HitStrategy = HIT_STRATEGY_OPTION,
unpack: bool = typer.Option(True),
progress_bar: bool = typer.Option(True),
index: Optional[str] = INDEX_OPTION,
cache: Optional[str] = CACHE_OPTION,
hit_strategy: ArtifactSettings.HitStrategy = HIT_STRATEGY_OPTION,
unpack: bool = UNPACK_OPTION,
progress_bar: bool = PROGRESS_BAR_OPTION,
):
artf_mgr = ArtifactManager(index=index, cache=cache)
artf_mgr.download_artifact_to_cache(
artifact_name=artifact_name,
unpack_archives=unpack,
hit_strategy=hit_strategy,
with_progress_bar=progress_bar,
artf_mgr = ArtifactManager(
settings=_create_settings(
index_path=index,
cache_path=cache,
hit_strategy=hit_strategy,
unpack_archives=unpack,
progress_bar=progress_bar,
),
)
artf_mgr.download_artifact_to_cache(artifact_name=artifact_name)


@app.command(help="Download all artifacts to cache")
@index_app.command(name="download", help="Download all index artifacts to cache.")
@cli_handler()
def download_all(
index: str = INDEX_OPTION,
cache: str = CACHE_OPTION,
hit_strategy: ArtifactManager.HitStrategy = HIT_STRATEGY_OPTION,
unpack: bool = typer.Option(True),
progress_bar: bool = typer.Option(True),
index: Annotated[
Optional[str],
typer.Argument(help=_get_unset_case_help(FALLBACK_INDEX_PATH)),
] = None,
cache: Optional[str] = CACHE_OPTION,
hit_strategy: ArtifactSettings.HitStrategy = HIT_STRATEGY_OPTION,
unpack: bool = UNPACK_OPTION,
progress_bar: bool = PROGRESS_BAR_OPTION,
):
artf_mgr = ArtifactManager(index=index, cache=cache)
for artf_name in artf_mgr.get_artifacts_in_index():
artf_mgr.download_artifact_to_cache(
artifact_name=artf_name,
unpack_archives=unpack,
artf_mgr = ArtifactManager(
settings=_create_settings(
index_path=index,
cache_path=cache,
hit_strategy=hit_strategy,
with_progress_bar=progress_bar,
)


if __name__ == "__main__":
app()
unpack_archives=unpack,
progress_bar=progress_bar,
),
)
for artf_name in artf_mgr.get_artifacts_in_index():
artf_mgr.download_artifact_to_cache(artifact_name=artf_name)
Loading

0 comments on commit 25ffe58

Please sign in to comment.