From 89c760ea2257a08e0646c4a7ce00aa2b54b72b1a Mon Sep 17 00:00:00 2001 From: Jonas Haag Date: Tue, 15 Mar 2022 20:32:43 +0100 Subject: [PATCH 1/3] Serve repodata.json with redirect --- environment.yml | 2 +- quetz/main.py | 101 +++++++++++++++++++---------------- quetz/pkgstores.py | 2 - quetz/tests/api/test_main.py | 28 ++++++++++ quetz/tests/test_mirror.py | 52 ++++++++++++------ 5 files changed, 119 insertions(+), 66 deletions(-) diff --git a/environment.yml b/environment.yml index 62df5b08..da22aa1e 100644 --- a/environment.yml +++ b/environment.yml @@ -8,7 +8,7 @@ dependencies: - typer - authlib - psycopg2 - - httpx=0.20.0 + - httpx=0.22.0 - sqlalchemy - sqlalchemy-utils - sqlite diff --git a/quetz/main.py b/quetz/main.py index a9e60319..006e8feb 100644 --- a/quetz/main.py +++ b/quetz/main.py @@ -1608,10 +1608,8 @@ def serve_path( session=Depends(get_remote_session), dao: Dao = Depends(get_dao), ): - - chunk_size = 10_000 - is_package_request = path.endswith((".tar.bz2", ".conda")) + is_repodata_request = path.endswith(".json") package_name = None if is_package_request: @@ -1630,65 +1628,74 @@ def serve_path( except ValueError: pass + # if we exclude the package from syncing, redirect to original URL if is_package_request and channel.mirror_channel_url: - # if we exclude the package from syncing, redirect to original URL channel_proxylist = json.loads(channel.channel_metadata).get('proxylist', []) if channel_proxylist and package_name and package_name in channel_proxylist: return RedirectResponse(f"{channel.mirror_channel_url}/{path}") + fsize = fmtime = fetag = None + if channel.mirror_channel_url and channel.mirror_mode == "proxy": repository = RemoteRepository(channel.mirror_channel_url, session) - if not pkgstore.file_exists(channel.name, path): + if is_repodata_request: + # Invalidate repodata.json and current_repodata.json after channel.ttl seconds + try: + fsize, fmtime, fetag = pkgstore.get_filemetadata(channel.name, path) + cache_miss = time.time() - fmtime >= channel.ttl + except FileNotFoundError: + cache_miss = True + else: + cache_miss = not pkgstore.file_exists(channel.name, path) + if cache_miss: download_remote_file(repository, pkgstore, channel.name, path) - elif path.endswith(".json"): - # repodata.json and current_repodata.json are cached locally - # for channel.ttl seconds - _, fmtime, _ = pkgstore.get_filemetadata(channel.name, path) - if time.time() - fmtime >= channel.ttl: - download_remote_file(repository, pkgstore, channel.name, path) - - if ( - is_package_request or pkgstore.kind == "LocalStore" - ) and pkgstore.support_redirect: - return RedirectResponse(pkgstore.url(channel.name, path)) - - def iter_chunks(fid): - while True: - data = fid.read(chunk_size) - if not data: - break - yield data - - if path == "" or path.endswith("/"): - path += "index.html" - package_content_iter = None - - headers = {} - if accept_encoding and 'gzip' in accept_encoding and path.endswith('.json'): - # return gzipped response - try: - package_content_iter = iter_chunks( - pkgstore.serve_path(channel.name, path + '.gz') - ) - path += '.gz' - headers['Content-Encoding'] = 'gzip' - headers['Content-Type'] = 'application/json' - except FileNotFoundError: - pass + fsize = fmtime = fetag = None - while not package_content_iter: - try: - package_content_iter = iter_chunks(pkgstore.serve_path(channel.name, path)) + gzip_exists = ( + is_repodata_request + and accept_encoding + and 'gzip' in accept_encoding + and pkgstore.file_exists(channel.name, path + ".gz") + ) + # Redirect response + if (is_package_request or is_repodata_request) and pkgstore.support_redirect: + return RedirectResponse( + pkgstore.url(channel.name, path + ".gz" if gzip_exists else path) + ) + + # Streaming response + def serve_file(path): + try: + return pkgstore.serve_path(channel.name, path) except FileNotFoundError: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"{channel.name}/{path} not found", ) - except IsADirectoryError: - path += "/index.html" - fsize, fmtime, fetag = pkgstore.get_filemetadata(channel.name, path) + if gzip_exists: + path += '.gz' + package_content = serve_file(path) + headers = { + 'Content-Encoding': 'gzip', + 'Content-Type': 'application/json', + } + else: + if path == "" or path.endswith("/"): + path += "index.html" + package_content = serve_file(path) + else: + try: + package_content = serve_file(path) + except IsADirectoryError: + path += "/index.html" + package_content = serve_file(path) + headers = {} + + if fsize is None: + # Maybe we already got (fsize, fmtime, fetag) above + fsize, fmtime, fetag = pkgstore.get_filemetadata(channel.name, path) headers.update( { 'Cache-Control': f'max-age={channel.ttl}', @@ -1697,6 +1704,8 @@ def iter_chunks(fid): 'ETag': fetag, } ) + chunk_size = 10_000 + package_content_iter = iter(lambda: package_content.read(chunk_size), b"") return StreamingResponse(package_content_iter, headers=headers) diff --git a/quetz/pkgstores.py b/quetz/pkgstores.py index 2edac0aa..9cd566d6 100644 --- a/quetz/pkgstores.py +++ b/quetz/pkgstores.py @@ -572,8 +572,6 @@ def __init__(self, config): @property def support_redirect(self): - # `gcsfs` currently doesnt support signing yet. Once this is implemented we - # can enable this again. return True @contextlib.contextmanager diff --git a/quetz/tests/api/test_main.py b/quetz/tests/api/test_main.py index 16fba400..63a55b16 100644 --- a/quetz/tests/api/test_main.py +++ b/quetz/tests/api/test_main.py @@ -1,9 +1,37 @@ import datetime +import io from unittest.mock import ANY import pytest from quetz.metrics.db_models import PackageVersionMetric +from quetz.tasks.indexing import update_indexes + + +def test_index_html(package_version, channel_name, client, mocker): + def serve_path(channel_name, path): + if path.endswith("index.html"): + return io.BytesIO(b"index.html content") + else: + raise IsADirectoryError + + pkgstore = mocker.Mock() + pkgstore.get_filemetadata.return_value = (0, 0, "") + pkgstore.url.side_effect = lambda chan, p: f"{chan}/{p}" + pkgstore.serve_path.side_effect = serve_path + mocker.patch("quetz.main.pkgstore", pkgstore) + + for url in [ + f"/get/{channel_name}", + f"/get/{channel_name}/", + f"/get/{channel_name}/index.html", + f"/get/{channel_name}/linux-64", + f"/get/{channel_name}/linux-64/", + f"/get/{channel_name}/linux-64/index.html", + ]: + response = client.get(url, allow_redirects=False) + assert response.status_code == 200 + assert response.text == "index.html content" def test_get_package_list(package_version, package_name, channel_name, client): diff --git a/quetz/tests/test_mirror.py b/quetz/tests/test_mirror.py index 8301006b..13132d6c 100644 --- a/quetz/tests/test_mirror.py +++ b/quetz/tests/test_mirror.py @@ -459,7 +459,6 @@ def test_synchronisation_sha( n_new_packages, arch, package_version, - mocker, ): pkgstore = config.get_package_store() rules = Rules("", {"user_id": str(uuid.UUID(bytes=user.id))}, db) @@ -521,7 +520,6 @@ def test_synchronisation_no_checksums_in_db( n_new_packages, arch, package_version, - mocker, ): package_info = '{"size": 5000, "subdirs":["noarch"]}' @@ -562,7 +560,15 @@ def close(self): assert len(versions) == n_new_packages + 1 -def test_download_remote_file(client, owner, dummy_repo): +@pytest.fixture(params=[True, False]) +def main_pkgstore_redirect(request, config, monkeypatch): + from quetz import main + + monkeypatch.setattr(main.pkgstore, "redirect_enabled", request.param) + return request.param + + +def test_download_remote_file(client, owner, dummy_repo, main_pkgstore_redirect): """Test downloading from cache.""" response = client.get("/api/dummylogin/bartosz") assert response.status_code == 200 @@ -605,7 +611,9 @@ def test_download_remote_file(client, owner, dummy_repo): assert dummy_repo == [("http://host/test_file_2.txt")] -def test_download_remote_file_in_parallel(client, owner, dummy_repo): +def test_download_remote_file_in_parallel( + client, owner, dummy_repo, main_pkgstore_redirect +): """Test downloading in parallel.""" response = client.get("/api/dummylogin/bartosz") assert response.status_code == 200 @@ -636,7 +644,7 @@ def get_remote_file(filename): assert dummy_repo == [(f"http://host/{test_file}")] -def test_proxy_repodata_cached(client, owner, dummy_repo): +def test_proxy_repodata_cached(client, owner, dummy_repo, main_pkgstore_redirect): """Test downloading from cache.""" response = client.get("/api/dummylogin/bartosz") assert response.status_code == 200 @@ -652,13 +660,15 @@ def test_proxy_repodata_cached(client, owner, dummy_repo): ) assert response.status_code == 201 - response = client.get("/get/proxy-channel-2/repodata.json") - assert response.status_code == 200 - assert response.content == b"Hello world!" - - response = client.get("/get/proxy-channel-2/repodata.json") - assert response.status_code == 200 - assert response.content == b"Hello world!" + for i in range(2): + response = client.get( + "/get/proxy-channel-2/repodata.json", allow_redirects=False + ) + if main_pkgstore_redirect: + assert 300 <= response.status_code < 400 + else: + assert response.status_code == 200 + assert response.content == b"Hello world!" # repodata.json was cached locally and downloaded from the # the remote only once @@ -674,7 +684,9 @@ def test_method_not_implemented_for_proxies(client, proxy_channel): assert "not implemented" in response.json()["detail"] -def test_api_methods_for_mirror_channels(client, mirror_channel): +def test_api_methods_for_mirror_channels( + client, mirror_channel, main_pkgstore_redirect +): """mirror-mode channels should have all standard API calls""" response = client.get("/api/channels/{}/packages".format(mirror_channel.name)) @@ -683,9 +695,13 @@ def test_api_methods_for_mirror_channels(client, mirror_channel): response = client.get( "/get/{}/missing/path/file.json".format(mirror_channel.name), + allow_redirects=False, ) - assert response.status_code == 404 - assert "file.json not found" in response.json()["detail"] + if main_pkgstore_redirect: + assert 300 <= response.status_code < 400 + else: + assert response.status_code == 404 + assert "file.json not found" in response.json()["detail"] @pytest.mark.parametrize( @@ -820,7 +836,9 @@ def test_add_and_register_mirror(auth_client, dummy_session_mock): ] ], ) -def test_wrong_package_format(client, dummy_repo, owner, job_supervisor): +def test_wrong_package_format( + client, dummy_repo, owner, job_supervisor, main_pkgstore_redirect +): response = client.get("/api/dummylogin/bartosz") assert response.status_code == 200 @@ -1062,7 +1080,7 @@ def test_includelist_and_excludelist_mirror_channel(owner, client): @pytest.mark.parametrize("mirror_mode", ["proxy", "mirror"]) -def test_proxylist_mirror_channel(owner, client, mirror_mode): +def test_proxylist_mirror_channel(owner, client, mirror_mode, main_pkgstore_redirect): response = client.get("/api/dummylogin/bartosz") assert response.status_code == 200 From 29a4ffba86576b1e1f162e78b743ef7c7cef7825 Mon Sep 17 00:00:00 2001 From: Jonas Haag Date: Wed, 16 Mar 2022 18:23:53 +0100 Subject: [PATCH 2/3] Don't redirect to .gz --- quetz/main.py | 39 ++++++++++++++++++------------------ quetz/tests/api/test_main.py | 1 - 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/quetz/main.py b/quetz/main.py index 006e8feb..5239140b 100644 --- a/quetz/main.py +++ b/quetz/main.py @@ -1651,37 +1651,38 @@ def serve_path( download_remote_file(repository, pkgstore, channel.name, path) fsize = fmtime = fetag = None - gzip_exists = ( - is_repodata_request - and accept_encoding - and 'gzip' in accept_encoding - and pkgstore.file_exists(channel.name, path + ".gz") - ) - # Redirect response if (is_package_request or is_repodata_request) and pkgstore.support_redirect: - return RedirectResponse( - pkgstore.url(channel.name, path + ".gz" if gzip_exists else path) - ) + return RedirectResponse(pkgstore.url(channel.name, path)) # Streaming response - def serve_file(path): + if is_repodata_request and accept_encoding and 'gzip' in accept_encoding: + # return gzipped response try: - return pkgstore.serve_path(channel.name, path) + package_content = pkgstore.serve_path(channel.name, path + ".gz") + have_gzip = True except FileNotFoundError: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"{channel.name}/{path} not found", - ) + have_gzip = False + else: + have_gzip = False - if gzip_exists: + if have_gzip: path += '.gz' - package_content = serve_file(path) headers = { 'Content-Encoding': 'gzip', 'Content-Type': 'application/json', } else: + + def serve_file(path): + try: + return pkgstore.serve_path(channel.name, path) + except FileNotFoundError: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"{channel.name}/{path} not found", + ) + if path == "" or path.endswith("/"): path += "index.html" package_content = serve_file(path) @@ -1693,7 +1694,7 @@ def serve_file(path): package_content = serve_file(path) headers = {} - if fsize is None: + if fsize is None or fetag is None or fmtime is None: # check all for mypy # Maybe we already got (fsize, fmtime, fetag) above fsize, fmtime, fetag = pkgstore.get_filemetadata(channel.name, path) headers.update( diff --git a/quetz/tests/api/test_main.py b/quetz/tests/api/test_main.py index 63a55b16..791f108b 100644 --- a/quetz/tests/api/test_main.py +++ b/quetz/tests/api/test_main.py @@ -5,7 +5,6 @@ import pytest from quetz.metrics.db_models import PackageVersionMetric -from quetz.tasks.indexing import update_indexes def test_index_html(package_version, channel_name, client, mocker): From 1dd46e72fd78258c6254d2eadfef01920a4dd253 Mon Sep 17 00:00:00 2001 From: Jonas Haag Date: Fri, 25 Mar 2022 14:08:59 +0100 Subject: [PATCH 3/3] Add Content-Type support to GCS pre-signed URLs --- quetz/main.py | 9 ++++++--- quetz/pkgstores.py | 27 ++++++++++++++++++++------- quetz/tests/api/test_main.py | 2 +- 3 files changed, 27 insertions(+), 11 deletions(-) diff --git a/quetz/main.py b/quetz/main.py index 5239140b..3af243fa 100644 --- a/quetz/main.py +++ b/quetz/main.py @@ -1602,11 +1602,12 @@ async def stop_sync_donwload_counts(): @app.head("/get/{channel_name}/{path:path}") @app.get("/get/{channel_name}/{path:path}") def serve_path( - path, + path: str, channel: db_models.Channel = Depends(get_channel_allow_proxy), accept_encoding: Optional[str] = Header(None), - session=Depends(get_remote_session), + session: dict = Depends(get_remote_session), dao: Dao = Depends(get_dao), + content_type: Optional[str] = Header(None), ): is_package_request = path.endswith((".tar.bz2", ".conda")) is_repodata_request = path.endswith(".json") @@ -1653,7 +1654,9 @@ def serve_path( # Redirect response if (is_package_request or is_repodata_request) and pkgstore.support_redirect: - return RedirectResponse(pkgstore.url(channel.name, path)) + return RedirectResponse( + pkgstore.url(channel.name, path, content_type=content_type) + ) # Streaming response if is_repodata_request and accept_encoding and 'gzip' in accept_encoding: diff --git a/quetz/pkgstores.py b/quetz/pkgstores.py index 9cd566d6..60628f28 100644 --- a/quetz/pkgstores.py +++ b/quetz/pkgstores.py @@ -15,7 +15,7 @@ from contextlib import contextmanager from os import PathLike from threading import Lock -from typing import IO, BinaryIO, List, NoReturn, Tuple, Union +from typing import IO, BinaryIO, List, NoReturn, Optional, Tuple, Union import fsspec from tenacity import retry, retry_if_exception_type, stop_after_attempt @@ -73,7 +73,7 @@ def list_files(self, channel: str) -> List[str]: pass @abc.abstractmethod - def url(self, channel: str, src: str, expires: int = 0) -> str: + def url(self, channel: str, src: str, expires: int = 0, content_type=None) -> str: pass @abc.abstractmethod @@ -199,7 +199,11 @@ def list_files(self, channel: str): channel_dir = os.path.join(self.channels_dir, channel) return [os.path.relpath(f, channel_dir) for f in self.fs.find(channel_dir)] - def url(self, channel: str, src: str, expires=0): + def url( + self, channel: str, src: str, expires=0, content_type: Optional[str] = None + ): + # Note: content_type is ignored because it's not needed + if self.redirect_enabled: # generate url + secret if necessary if self.redirect_secret: @@ -376,7 +380,11 @@ def remove_prefix(text, prefix): with self._get_fs() as fs: return [remove_prefix(f, channel_bucket) for f in fs.find(channel_bucket)] - def url(self, channel: str, src: str, expires=3600): + def url( + self, channel: str, src: str, expires=3600, content_type: Optional[str] = None + ): + if content_type is not None: + raise NotImplementedError("'content_type' is not supported for S3") # expires is in seconds, so the default is 60 minutes! with self._get_fs() as fs: return fs.url(path.join(self._bucket_map(channel), src), expires) @@ -517,9 +525,12 @@ def remove_prefix(text, prefix): remove_prefix(f, channel_container) for f in fs.find(channel_container) ] - def url(self, channel: str, src: str, expires=3600): + def url( + self, channel: str, src: str, expires=3600, content_type: Optional[str] = None + ): # expires is in seconds, so the default is 60 minutes! with self._get_fs() as fs: + # Note: content_type is ignored because it's not needed return fs.url(path.join(self._container_map(channel), src), expires) def get_filemetadata(self, channel: str, src: str): @@ -666,7 +677,7 @@ def remove_prefix(text, prefix): remove_prefix(f, channel_container) for f in fs.find(channel_container) ] - def url(self, channel: str, src: str, expires=3600): + def url(self, channel: str, src: str, expires=3600, content_type=None): # expires is in seconds, so the default is 60 minutes! with self._get_fs() as fs: expiration_timestamp = ( @@ -674,7 +685,9 @@ def url(self, channel: str, src: str, expires=3600): + expires ) redirect_url = fs.sign( - path.join(self._bucket_map(channel), src), expiration_timestamp + path.join(self._bucket_map(channel), src), + expiration_timestamp, + content_type=content_type, ) return redirect_url diff --git a/quetz/tests/api/test_main.py b/quetz/tests/api/test_main.py index 791f108b..e5da1fec 100644 --- a/quetz/tests/api/test_main.py +++ b/quetz/tests/api/test_main.py @@ -16,7 +16,7 @@ def serve_path(channel_name, path): pkgstore = mocker.Mock() pkgstore.get_filemetadata.return_value = (0, 0, "") - pkgstore.url.side_effect = lambda chan, p: f"{chan}/{p}" + pkgstore.url.side_effect = lambda chan, p, **ignored: f"{chan}/{p}" pkgstore.serve_path.side_effect = serve_path mocker.patch("quetz.main.pkgstore", pkgstore)