Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add symbolic link options #482

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 25 additions & 17 deletions py7zr/py7zr.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,7 @@ def _extract(
targets: Optional[Collection[str]] = None,
return_dict: bool = False,
callback: Optional[ExtractCallback] = None,
enable_symlink: bool = False,
recursive: Optional[bool] = False,
) -> Optional[dict[str, IO[Any]]]:
if callback is None:
Expand Down Expand Up @@ -609,7 +610,12 @@ def _extract(
elif f.is_socket:
pass # TODO: implement me.
elif f.is_symlink or f.is_junction:
self.worker.register_filelike(f.id, outfilename)
if enable_symlink:
self.worker.register_filelike(f.id, outfilename)
else:
# Archive has symlink or junction
# this has security consequences.
raise ValueError("Archive has symbolic link that is not explicitly enabled.")
else:
self.worker.register_filelike(f.id, outfilename)
target_files.append((outfilename, f.file_properties()))
Expand Down Expand Up @@ -713,9 +719,9 @@ def _write_header(self):
self.sig_header.calccrc(header_len, header_crc)
self.sig_header.write(self.fp)

def _writeall(self, path, arcname):
def _writeall(self, path, arcname, dereference: bool = self.dereference):
try:
if path.is_symlink() and not self.dereference:
if path.is_symlink() and not dereference:
self.write(path, arcname)
elif path.is_file():
self.write(path, arcname)
Expand All @@ -724,14 +730,14 @@ def _writeall(self, path, arcname):
self.write(path, arcname)
for nm in sorted(os.listdir(str(path))):
arc = os.path.join(arcname, nm) if arcname is not None else None
self._writeall(path.joinpath(nm), arc)
self._writeall(path.joinpath(nm), arc, dereference=dereference)
else:
return # pathlib ignores ELOOP and return False for is_*().
except OSError as ose:
if self.dereference and ose.errno in [errno.ELOOP]:
if dereference and ose.errno in [errno.ELOOP]:
return # ignore ELOOP here, this resulted to stop looped symlink reference.
elif self.dereference and sys.platform == "win32" and ose.errno in [errno.ENOENT]:
return # ignore ENOENT which is happened when a case of ELOOP on windows.
elif dereference and sys.platform == "win32" and ose.errno in [errno.ENOENT]:
return # ignore ENOENT which is happened when a case of ELOOP on Windows.
else:
raise

Expand Down Expand Up @@ -1020,13 +1026,15 @@ def readall(self) -> Optional[dict[str, IO[Any]]]:
self._dict = {}
return self._extract(path=None, return_dict=True)

def extractall(self, path: Optional[Any] = None, callback: Optional[ExtractCallback] = None) -> None:
def extractall(
self, path: Optional[Any] = None, callback: Optional[ExtractCallback] = None, enable_symlink: bool = True
) -> None:
"""Extract all members from the archive to the current working
directory and set owner, modification time and permissions on
directories afterwards. ``path`` specifies a different directory
to extract to.
"""
self._extract(path=path, return_dict=False, callback=callback)
self._extract(path=path, return_dict=False, callback=callback, enable_symlink=enable_symlink)

def read(self, targets: Optional[Collection[str]] = None) -> Optional[dict[str, IO[Any]]]:
if not self._is_none_or_collection(targets):
Expand All @@ -1039,15 +1047,15 @@ def read(self, targets: Optional[Collection[str]] = None) -> Optional[dict[str,
return self._extract(path=None, targets=targets, return_dict=True)

def extract(
self, path: Optional[Any] = None, targets: Optional[Collection[str]] = None, recursive: Optional[bool] = False
) -> None:
self, path: Optional[Any] = None, targets: Optional[Collection[str]] = None, recursive: Optional[bool] = False,
enable_symlink: bool = True) -> None:
if not self._is_none_or_collection(targets):
raise TypeError("Wrong argument type given.")
# For interoperability with ZipFile, we strip any trailing slashes
# This also matches the behavior of TarFile
if targets is not None:
targets = [remove_trailing_slash(target) for target in targets]
self._extract(path, targets, return_dict=False, recursive=recursive)
self._extract(path, targets, return_dict=False, recursive=recursive, enable_symlink=enable_symlink)

def reporter(self, callback: ExtractCallback):
while True:
Expand All @@ -1074,18 +1082,18 @@ def reporter(self, callback: ExtractCallback):
pass
self.q.task_done()

def writeall(self, path: Union[pathlib.Path, str], arcname: Optional[str] = None):
def writeall(self, path: Union[pathlib.Path, str], arcname: Optional[str] = None, dereference: bool = self.dereference):
"""Write files in target path into archive."""
if isinstance(path, str):
path = pathlib.Path(path)
if not path.exists():
raise ValueError("specified path does not exist.")
if path.is_dir() or path.is_file():
self._writeall(path, arcname)
self._writeall(path, arcname, dereference)
else:
raise ValueError("specified path is not a directory or a file")

def write(self, file: Union[pathlib.Path, str], arcname: Optional[str] = None):
def write(self, file: Union[pathlib.Path, str], arcname: Optional[str] = None, dereference: bool = self.dereference):
"""Write single target file into archive."""
if not isinstance(file, str) and not isinstance(file, pathlib.Path):
raise ValueError("Unsupported file type.")
Expand All @@ -1098,11 +1106,11 @@ def write(self, file: Union[pathlib.Path, str], arcname: Optional[str] = None):
else:
path = file
folder = self.header.initialize()
file_info = self._make_file_info(path, arcname, self.dereference)
file_info = self._make_file_info(path, arcname, dereference)
self.header.files_info.files.append(file_info)
self.header.files_info.emptyfiles.append(file_info["emptystream"])
self.files.append(file_info)
self.worker.archive(self.fp, self.files, folder, deref=self.dereference)
self.worker.archive(self.fp, self.files, folder, deref=dereference)

def writed(self, targets: dict[str, IO[Any]]) -> None:
for target, input in targets.items():
Expand Down
Loading