Skip to content

Commit

Permalink
Add symbolic link options
Browse files Browse the repository at this point in the history
- Add enable_symlink boolean option for read, extractall, and extract
- Add dereference boolean option for write and writeall
  - SevenZipFile constructor option dereference is marked deprecated
     and will be removed in future version

Signed-off-by: Hiroshi Miura <[email protected]>
  • Loading branch information
miurahr committed Oct 30, 2022
1 parent 04e3af5 commit 97b90c6
Showing 1 changed file with 22 additions and 16 deletions.
38 changes: 22 additions & 16 deletions py7zr/py7zr.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,7 @@ def _extract(
targets: Optional[List[str]] = None,
return_dict: bool = False,
callback: Optional[ExtractCallback] = None,
enable_symlink: bool = False,
) -> Optional[Dict[str, IO[Any]]]:
if callback is None:
pass
Expand Down Expand Up @@ -616,7 +617,12 @@ def _extract(
elif f.is_socket:
pass # TODO: implement me.
elif f.is_symlink or f.is_junction:
self.worker.register_filelike(f.id, outfilename)
if enable_symlink:
self.worker.register_filelike(f.id, outfilename)
else:
# Archive has symlink or junction
# this has security consequences.
raise ValueError("Archive has symbolic link that is not explicitly enabled.")
else:
self.worker.register_filelike(f.id, outfilename)
target_files.append((outfilename, f.file_properties()))
Expand Down Expand Up @@ -717,9 +723,9 @@ def _write_header(self):
self.sig_header.calccrc(header_len, header_crc)
self.sig_header.write(self.fp)

def _writeall(self, path, arcname):
def _writeall(self, path, arcname, dereference: bool = False):
try:
if path.is_symlink() and not self.dereference:
if path.is_symlink() and not dereference:
self.write(path, arcname)
elif path.is_file():
self.write(path, arcname)
Expand All @@ -728,14 +734,14 @@ def _writeall(self, path, arcname):
self.write(path, arcname)
for nm in sorted(os.listdir(str(path))):
arc = os.path.join(arcname, nm) if arcname is not None else None
self._writeall(path.joinpath(nm), arc)
self._writeall(path.joinpath(nm), arc, dereference=dereference)
else:
return # pathlib ignores ELOOP and return False for is_*().
except OSError as ose:
if self.dereference and ose.errno in [errno.ELOOP]:
if dereference and ose.errno in [errno.ELOOP]:
return # ignore ELOOP here, this resulted to stop looped symlink reference.
elif self.dereference and sys.platform == "win32" and ose.errno in [errno.ENOENT]:
return # ignore ENOENT which is happened when a case of ELOOP on windows.
elif dereference and sys.platform == "win32" and ose.errno in [errno.ENOENT]:
return # ignore ENOENT which is happened when a case of ELOOP on Windows.
else:
raise

Expand Down Expand Up @@ -971,20 +977,20 @@ def readall(self) -> Optional[Dict[str, IO[Any]]]:
self._dict = {}
return self._extract(path=None, return_dict=True)

def extractall(self, path: Optional[Any] = None, callback: Optional[ExtractCallback] = None) -> None:
def extractall(self, path: Optional[Any] = None, callback: Optional[ExtractCallback] = None, enable_symlink: bool = False) -> None:
"""Extract all members from the archive to the current working
directory and set owner, modification time and permissions on
directories afterwards. ``path`` specifies a different directory
to extract to.
"""
self._extract(path=path, return_dict=False, callback=callback)
self._extract(path=path, return_dict=False, callback=callback, enable_symlink=enable_symlink)

def read(self, targets: Optional[List[str]] = None) -> Optional[Dict[str, IO[Any]]]:
self._dict = {}
return self._extract(path=None, targets=targets, return_dict=True)

def extract(self, path: Optional[Any] = None, targets: Optional[List[str]] = None) -> None:
self._extract(path, targets, return_dict=False)
def extract(self, path: Optional[Any] = None, targets: Optional[List[str]] = None, enable_symlink: bool = False) -> None:
self._extract(path, targets, return_dict=False, enable_symlink=enable_symlink)

def reporter(self, callback: ExtractCallback):
while True:
Expand All @@ -1009,18 +1015,18 @@ def reporter(self, callback: ExtractCallback):
pass
self.q.task_done()

def writeall(self, path: Union[pathlib.Path, str], arcname: Optional[str] = None):
def writeall(self, path: Union[pathlib.Path, str], arcname: Optional[str] = None, dereference: bool = False):
"""Write files in target path into archive."""
if isinstance(path, str):
path = pathlib.Path(path)
if not path.exists():
raise ValueError("specified path does not exist.")
if path.is_dir() or path.is_file():
self._writeall(path, arcname)
self._writeall(path, arcname, dereference or self.dereference)
else:
raise ValueError("specified path is not a directory or a file")

def write(self, file: Union[pathlib.Path, str], arcname: Optional[str] = None):
def write(self, file: Union[pathlib.Path, str], arcname: Optional[str] = None, dereference: bool = False):
"""Write single target file into archive."""
if isinstance(file, str):
path = pathlib.Path(file)
Expand All @@ -1029,11 +1035,11 @@ def write(self, file: Union[pathlib.Path, str], arcname: Optional[str] = None):
else:
raise ValueError("Unsupported file type.")
folder = self.header.initialize()
file_info = self._make_file_info(path, arcname, self.dereference)
file_info = self._make_file_info(path, arcname, dereference or self.dereference)
self.header.files_info.files.append(file_info)
self.header.files_info.emptyfiles.append(file_info["emptystream"])
self.files.append(file_info)
self.worker.archive(self.fp, self.files, folder, deref=self.dereference)
self.worker.archive(self.fp, self.files, folder, deref=dereference or self.dereference)

def writed(self, targets: Dict[str, IO[Any]]) -> None:
for target, input in targets.items():
Expand Down

0 comments on commit 97b90c6

Please sign in to comment.