Skip to content

Commit

Permalink
Add importlib_resources to lib
Browse files Browse the repository at this point in the history
  • Loading branch information
rembo10 committed Feb 14, 2022
1 parent d89f417 commit 586b9ed
Show file tree
Hide file tree
Showing 10 changed files with 939 additions and 0 deletions.
36 changes: 36 additions & 0 deletions lib/importlib_resources/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""Read resources contained within a package."""

from ._common import (
as_file,
files,
Package,
)

from ._legacy import (
contents,
open_binary,
read_binary,
open_text,
read_text,
is_resource,
path,
Resource,
)

from importlib_resources.abc import ResourceReader


__all__ = [
'Package',
'Resource',
'ResourceReader',
'as_file',
'contents',
'files',
'is_resource',
'open_binary',
'open_text',
'path',
'read_binary',
'read_text',
]
170 changes: 170 additions & 0 deletions lib/importlib_resources/_adapters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
from contextlib import suppress
from io import TextIOWrapper

from . import abc


class SpecLoaderAdapter:
"""
Adapt a package spec to adapt the underlying loader.
"""

def __init__(self, spec, adapter=lambda spec: spec.loader):
self.spec = spec
self.loader = adapter(spec)

def __getattr__(self, name):
return getattr(self.spec, name)


class TraversableResourcesLoader:
"""
Adapt a loader to provide TraversableResources.
"""

def __init__(self, spec):
self.spec = spec

def get_resource_reader(self, name):
return CompatibilityFiles(self.spec)._native()


def _io_wrapper(file, mode='r', *args, **kwargs):
if mode == 'r':
return TextIOWrapper(file, *args, **kwargs)
elif mode == 'rb':
return file
raise ValueError(
"Invalid mode value '{}', only 'r' and 'rb' are supported".format(mode)
)


class CompatibilityFiles:
"""
Adapter for an existing or non-existent resource reader
to provide a compatibility .files().
"""

class SpecPath(abc.Traversable):
"""
Path tied to a module spec.
Can be read and exposes the resource reader children.
"""

def __init__(self, spec, reader):
self._spec = spec
self._reader = reader

def iterdir(self):
if not self._reader:
return iter(())
return iter(
CompatibilityFiles.ChildPath(self._reader, path)
for path in self._reader.contents()
)

def is_file(self):
return False

is_dir = is_file

def joinpath(self, other):
if not self._reader:
return CompatibilityFiles.OrphanPath(other)
return CompatibilityFiles.ChildPath(self._reader, other)

@property
def name(self):
return self._spec.name

def open(self, mode='r', *args, **kwargs):
return _io_wrapper(self._reader.open_resource(None), mode, *args, **kwargs)

class ChildPath(abc.Traversable):
"""
Path tied to a resource reader child.
Can be read but doesn't expose any meaningful children.
"""

def __init__(self, reader, name):
self._reader = reader
self._name = name

def iterdir(self):
return iter(())

def is_file(self):
return self._reader.is_resource(self.name)

def is_dir(self):
return not self.is_file()

def joinpath(self, other):
return CompatibilityFiles.OrphanPath(self.name, other)

@property
def name(self):
return self._name

def open(self, mode='r', *args, **kwargs):
return _io_wrapper(
self._reader.open_resource(self.name), mode, *args, **kwargs
)

class OrphanPath(abc.Traversable):
"""
Orphan path, not tied to a module spec or resource reader.
Can't be read and doesn't expose any meaningful children.
"""

def __init__(self, *path_parts):
if len(path_parts) < 1:
raise ValueError('Need at least one path part to construct a path')
self._path = path_parts

def iterdir(self):
return iter(())

def is_file(self):
return False

is_dir = is_file

def joinpath(self, other):
return CompatibilityFiles.OrphanPath(*self._path, other)

@property
def name(self):
return self._path[-1]

def open(self, mode='r', *args, **kwargs):
raise FileNotFoundError("Can't open orphan path")

def __init__(self, spec):
self.spec = spec

@property
def _reader(self):
with suppress(AttributeError):
return self.spec.loader.get_resource_reader(self.spec.name)

def _native(self):
"""
Return the native reader if it supports files().
"""
reader = self._reader
return reader if hasattr(reader, 'files') else self

def __getattr__(self, attr):
return getattr(self._reader, attr)

def files(self):
return CompatibilityFiles.SpecPath(self.spec, self._reader)


def wrap_spec(package):
"""
Construct a package spec with traversable compatibility
on the spec/loader/reader.
"""
return SpecLoaderAdapter(package.__spec__, TraversableResourcesLoader)
104 changes: 104 additions & 0 deletions lib/importlib_resources/_common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import os
import pathlib
import tempfile
import functools
import contextlib
import types
import importlib

from typing import Union, Optional
from .abc import ResourceReader, Traversable

from ._compat import wrap_spec

Package = Union[types.ModuleType, str]


def files(package):
# type: (Package) -> Traversable
"""
Get a Traversable resource from a package
"""
return from_package(get_package(package))


def get_resource_reader(package):
# type: (types.ModuleType) -> Optional[ResourceReader]
"""
Return the package's loader if it's a ResourceReader.
"""
# We can't use
# a issubclass() check here because apparently abc.'s __subclasscheck__()
# hook wants to create a weak reference to the object, but
# zipimport.zipimporter does not support weak references, resulting in a
# TypeError. That seems terrible.
spec = package.__spec__
reader = getattr(spec.loader, 'get_resource_reader', None) # type: ignore
if reader is None:
return None
return reader(spec.name) # type: ignore


def resolve(cand):
# type: (Package) -> types.ModuleType
return cand if isinstance(cand, types.ModuleType) else importlib.import_module(cand)


def get_package(package):
# type: (Package) -> types.ModuleType
"""Take a package name or module object and return the module.
Raise an exception if the resolved module is not a package.
"""
resolved = resolve(package)
if wrap_spec(resolved).submodule_search_locations is None:
raise TypeError(f'{package!r} is not a package')
return resolved


def from_package(package):
"""
Return a Traversable object for the given package.
"""
spec = wrap_spec(package)
reader = spec.loader.get_resource_reader(spec.name)
return reader.files()


@contextlib.contextmanager
def _tempfile(reader, suffix=''):
# Not using tempfile.NamedTemporaryFile as it leads to deeper 'try'
# blocks due to the need to close the temporary file to work on Windows
# properly.
fd, raw_path = tempfile.mkstemp(suffix=suffix)
try:
try:
os.write(fd, reader())
finally:
os.close(fd)
del reader
yield pathlib.Path(raw_path)
finally:
try:
os.remove(raw_path)
except FileNotFoundError:
pass


@functools.singledispatch
def as_file(path):
"""
Given a Traversable object, return that object as a
path on the local file system in a context manager.
"""
return _tempfile(path.read_bytes, suffix=path.name)


@as_file.register(pathlib.Path)
@contextlib.contextmanager
def _(path):
"""
Degenerate behavior for pathlib.Path objects.
"""
yield path
Loading

0 comments on commit 586b9ed

Please sign in to comment.