Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serve repodata.json with redirect #506

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ dependencies:
- typer
- authlib
- psycopg2
- httpx=0.20.0
- httpx=0.22.0
- sqlalchemy
- sqlalchemy-utils
- sqlite
Expand Down
113 changes: 63 additions & 50 deletions quetz/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -1602,16 +1602,15 @@ async def stop_sync_donwload_counts():
@app.head("/get/{channel_name}/{path:path}")
@app.get("/get/{channel_name}/{path:path}")
def serve_path(
path,
path: str,
channel: db_models.Channel = Depends(get_channel_allow_proxy),
accept_encoding: Optional[str] = Header(None),
session=Depends(get_remote_session),
session: dict = Depends(get_remote_session),
dao: Dao = Depends(get_dao),
content_type: Optional[str] = Header(None),
):

chunk_size = 10_000

is_package_request = path.endswith((".tar.bz2", ".conda"))
is_repodata_request = path.endswith(".json")

package_name = None
if is_package_request:
Expand All @@ -1630,65 +1629,77 @@ def serve_path(
except ValueError:
pass

# if we exclude the package from syncing, redirect to original URL
if is_package_request and channel.mirror_channel_url:
# if we exclude the package from syncing, redirect to original URL
channel_proxylist = json.loads(channel.channel_metadata).get('proxylist', [])
if channel_proxylist and package_name and package_name in channel_proxylist:
return RedirectResponse(f"{channel.mirror_channel_url}/{path}")

fsize = fmtime = fetag = None

if channel.mirror_channel_url and channel.mirror_mode == "proxy":
repository = RemoteRepository(channel.mirror_channel_url, session)
if not pkgstore.file_exists(channel.name, path):
if is_repodata_request:
# Invalidate repodata.json and current_repodata.json after channel.ttl seconds
try:
fsize, fmtime, fetag = pkgstore.get_filemetadata(channel.name, path)
cache_miss = time.time() - fmtime >= channel.ttl
except FileNotFoundError:
cache_miss = True
else:
cache_miss = not pkgstore.file_exists(channel.name, path)
if cache_miss:
download_remote_file(repository, pkgstore, channel.name, path)
elif path.endswith(".json"):
# repodata.json and current_repodata.json are cached locally
# for channel.ttl seconds
_, fmtime, _ = pkgstore.get_filemetadata(channel.name, path)
if time.time() - fmtime >= channel.ttl:
download_remote_file(repository, pkgstore, channel.name, path)

if (
is_package_request or pkgstore.kind == "LocalStore"
) and pkgstore.support_redirect:
return RedirectResponse(pkgstore.url(channel.name, path))

def iter_chunks(fid):
while True:
data = fid.read(chunk_size)
if not data:
break
yield data

if path == "" or path.endswith("/"):
path += "index.html"
package_content_iter = None

headers = {}
if accept_encoding and 'gzip' in accept_encoding and path.endswith('.json'):
fsize = fmtime = fetag = None

# Redirect response
if (is_package_request or is_repodata_request) and pkgstore.support_redirect:
return RedirectResponse(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you confirm that this works? Since it redirects to a path with .gz attached and I am not sure mamba / conda will handle that correctly.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are probably right, will have to do more testing. I should have done more testing because I've also found two other issues. One is that Conda sets Content-Type: json which doesn't work with the GCS signature (I assume you need to sign all the request headers as well, not just the URL?). Plus, if a file is not found, the client will get a 403 from GCS although it would be more helpful to receive a 404.

pkgstore.url(channel.name, path, content_type=content_type)
)

# Streaming response
if is_repodata_request and accept_encoding and 'gzip' in accept_encoding:
# return gzipped response
try:
package_content_iter = iter_chunks(
pkgstore.serve_path(channel.name, path + '.gz')
)
path += '.gz'
headers['Content-Encoding'] = 'gzip'
headers['Content-Type'] = 'application/json'
package_content = pkgstore.serve_path(channel.name, path + ".gz")
have_gzip = True
except FileNotFoundError:
pass
have_gzip = False
else:
have_gzip = False

while not package_content_iter:
try:
package_content_iter = iter_chunks(pkgstore.serve_path(channel.name, path))
if have_gzip:
path += '.gz'
headers = {
'Content-Encoding': 'gzip',
'Content-Type': 'application/json',
}
else:

except FileNotFoundError:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"{channel.name}/{path} not found",
)
except IsADirectoryError:
path += "/index.html"
def serve_file(path):
try:
return pkgstore.serve_path(channel.name, path)
except FileNotFoundError:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"{channel.name}/{path} not found",
)

fsize, fmtime, fetag = pkgstore.get_filemetadata(channel.name, path)
if path == "" or path.endswith("/"):
path += "index.html"
package_content = serve_file(path)
else:
try:
package_content = serve_file(path)
except IsADirectoryError:
path += "/index.html"
package_content = serve_file(path)
headers = {}

if fsize is None or fetag is None or fmtime is None: # check all for mypy
# Maybe we already got (fsize, fmtime, fetag) above
fsize, fmtime, fetag = pkgstore.get_filemetadata(channel.name, path)
headers.update(
{
'Cache-Control': f'max-age={channel.ttl}',
Expand All @@ -1697,6 +1708,8 @@ def iter_chunks(fid):
'ETag': fetag,
}
)
chunk_size = 10_000
package_content_iter = iter(lambda: package_content.read(chunk_size), b"")
return StreamingResponse(package_content_iter, headers=headers)


Expand Down
29 changes: 20 additions & 9 deletions quetz/pkgstores.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from contextlib import contextmanager
from os import PathLike
from threading import Lock
from typing import IO, BinaryIO, List, NoReturn, Tuple, Union
from typing import IO, BinaryIO, List, NoReturn, Optional, Tuple, Union

import fsspec
from tenacity import retry, retry_if_exception_type, stop_after_attempt
Expand Down Expand Up @@ -73,7 +73,7 @@ def list_files(self, channel: str) -> List[str]:
pass

@abc.abstractmethod
def url(self, channel: str, src: str, expires: int = 0) -> str:
def url(self, channel: str, src: str, expires: int = 0, content_type=None) -> str:
pass

@abc.abstractmethod
Expand Down Expand Up @@ -199,7 +199,11 @@ def list_files(self, channel: str):
channel_dir = os.path.join(self.channels_dir, channel)
return [os.path.relpath(f, channel_dir) for f in self.fs.find(channel_dir)]

def url(self, channel: str, src: str, expires=0):
def url(
self, channel: str, src: str, expires=0, content_type: Optional[str] = None
):
# Note: content_type is ignored because it's not needed

if self.redirect_enabled:
# generate url + secret if necessary
if self.redirect_secret:
Expand Down Expand Up @@ -376,7 +380,11 @@ def remove_prefix(text, prefix):
with self._get_fs() as fs:
return [remove_prefix(f, channel_bucket) for f in fs.find(channel_bucket)]

def url(self, channel: str, src: str, expires=3600):
def url(
self, channel: str, src: str, expires=3600, content_type: Optional[str] = None
):
if content_type is not None:
raise NotImplementedError("'content_type' is not supported for S3")
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: Check what to do with S3. I am not sure if S3 will happily accept non-signed headers or will error out (in the latter case we can't use redirects for repodata.json on S3)

# expires is in seconds, so the default is 60 minutes!
with self._get_fs() as fs:
return fs.url(path.join(self._bucket_map(channel), src), expires)
Expand Down Expand Up @@ -517,9 +525,12 @@ def remove_prefix(text, prefix):
remove_prefix(f, channel_container) for f in fs.find(channel_container)
]

def url(self, channel: str, src: str, expires=3600):
def url(
self, channel: str, src: str, expires=3600, content_type: Optional[str] = None
):
# expires is in seconds, so the default is 60 minutes!
with self._get_fs() as fs:
# Note: content_type is ignored because it's not needed
return fs.url(path.join(self._container_map(channel), src), expires)

def get_filemetadata(self, channel: str, src: str):
Expand Down Expand Up @@ -572,8 +583,6 @@ def __init__(self, config):

@property
def support_redirect(self):
# `gcsfs` currently doesnt support signing yet. Once this is implemented we
# can enable this again.
return True

@contextlib.contextmanager
Expand Down Expand Up @@ -668,15 +677,17 @@ def remove_prefix(text, prefix):
remove_prefix(f, channel_container) for f in fs.find(channel_container)
]

def url(self, channel: str, src: str, expires=3600):
def url(self, channel: str, src: str, expires=3600, content_type=None):
# expires is in seconds, so the default is 60 minutes!
with self._get_fs() as fs:
expiration_timestamp = (
int(datetime.datetime.now(tz=datetime.timezone.utc).timestamp())
+ expires
)
redirect_url = fs.sign(
path.join(self._bucket_map(channel), src), expiration_timestamp
path.join(self._bucket_map(channel), src),
expiration_timestamp,
content_type=content_type,
)
return redirect_url

Expand Down
27 changes: 27 additions & 0 deletions quetz/tests/api/test_main.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,38 @@
import datetime
import io
from unittest.mock import ANY

import pytest

from quetz.metrics.db_models import PackageVersionMetric


def test_index_html(package_version, channel_name, client, mocker):
def serve_path(channel_name, path):
if path.endswith("index.html"):
return io.BytesIO(b"index.html content")
else:
raise IsADirectoryError

pkgstore = mocker.Mock()
pkgstore.get_filemetadata.return_value = (0, 0, "")
pkgstore.url.side_effect = lambda chan, p, **ignored: f"{chan}/{p}"
pkgstore.serve_path.side_effect = serve_path
mocker.patch("quetz.main.pkgstore", pkgstore)

for url in [
f"/get/{channel_name}",
f"/get/{channel_name}/",
f"/get/{channel_name}/index.html",
f"/get/{channel_name}/linux-64",
f"/get/{channel_name}/linux-64/",
f"/get/{channel_name}/linux-64/index.html",
]:
response = client.get(url, allow_redirects=False)
assert response.status_code == 200
assert response.text == "index.html content"


def test_get_package_list(package_version, package_name, channel_name, client):

response = client.get("/api/dummylogin/bartosz")
Expand Down
Loading