diff --git a/py7zr/py7zr.py b/py7zr/py7zr.py index 2a151f0c..cd35c224 100644 --- a/py7zr/py7zr.py +++ b/py7zr/py7zr.py @@ -527,6 +527,7 @@ def _extract( targets: Optional[List[str]] = None, return_dict: bool = False, callback: Optional[ExtractCallback] = None, + enable_symlink: bool = False, ) -> Optional[Dict[str, IO[Any]]]: if callback is None: pass @@ -616,7 +617,12 @@ def _extract( elif f.is_socket: pass # TODO: implement me. elif f.is_symlink or f.is_junction: - self.worker.register_filelike(f.id, outfilename) + if enable_symlink: + self.worker.register_filelike(f.id, outfilename) + else: + # Archive has symlink or junction + # this has security consequences. + raise ValueError("Archive has symbolic link that is not explicitly enabled.") else: self.worker.register_filelike(f.id, outfilename) target_files.append((outfilename, f.file_properties())) @@ -717,9 +723,9 @@ def _write_header(self): self.sig_header.calccrc(header_len, header_crc) self.sig_header.write(self.fp) - def _writeall(self, path, arcname): + def _writeall(self, path, arcname, dereference: bool = False): try: - if path.is_symlink() and not self.dereference: + if path.is_symlink() and not dereference: self.write(path, arcname) elif path.is_file(): self.write(path, arcname) @@ -728,14 +734,14 @@ def _writeall(self, path, arcname): self.write(path, arcname) for nm in sorted(os.listdir(str(path))): arc = os.path.join(arcname, nm) if arcname is not None else None - self._writeall(path.joinpath(nm), arc) + self._writeall(path.joinpath(nm), arc, dereference=dereference) else: return # pathlib ignores ELOOP and return False for is_*(). except OSError as ose: - if self.dereference and ose.errno in [errno.ELOOP]: + if dereference and ose.errno in [errno.ELOOP]: return # ignore ELOOP here, this resulted to stop looped symlink reference. - elif self.dereference and sys.platform == "win32" and ose.errno in [errno.ENOENT]: - return # ignore ENOENT which is happened when a case of ELOOP on windows. + elif dereference and sys.platform == "win32" and ose.errno in [errno.ENOENT]: + return # ignore ENOENT which is happened when a case of ELOOP on Windows. else: raise @@ -971,20 +977,20 @@ def readall(self) -> Optional[Dict[str, IO[Any]]]: self._dict = {} return self._extract(path=None, return_dict=True) - def extractall(self, path: Optional[Any] = None, callback: Optional[ExtractCallback] = None) -> None: + def extractall(self, path: Optional[Any] = None, callback: Optional[ExtractCallback] = None, enable_symlink: bool = False) -> None: """Extract all members from the archive to the current working directory and set owner, modification time and permissions on directories afterwards. ``path`` specifies a different directory to extract to. """ - self._extract(path=path, return_dict=False, callback=callback) + self._extract(path=path, return_dict=False, callback=callback, enable_symlink=enable_symlink) def read(self, targets: Optional[List[str]] = None) -> Optional[Dict[str, IO[Any]]]: self._dict = {} return self._extract(path=None, targets=targets, return_dict=True) - def extract(self, path: Optional[Any] = None, targets: Optional[List[str]] = None) -> None: - self._extract(path, targets, return_dict=False) + def extract(self, path: Optional[Any] = None, targets: Optional[List[str]] = None, enable_symlink: bool = False) -> None: + self._extract(path, targets, return_dict=False, enable_symlink=enable_symlink) def reporter(self, callback: ExtractCallback): while True: @@ -1009,18 +1015,18 @@ def reporter(self, callback: ExtractCallback): pass self.q.task_done() - def writeall(self, path: Union[pathlib.Path, str], arcname: Optional[str] = None): + def writeall(self, path: Union[pathlib.Path, str], arcname: Optional[str] = None, dereference: bool = False): """Write files in target path into archive.""" if isinstance(path, str): path = pathlib.Path(path) if not path.exists(): raise ValueError("specified path does not exist.") if path.is_dir() or path.is_file(): - self._writeall(path, arcname) + self._writeall(path, arcname, dereference or self.dereference) else: raise ValueError("specified path is not a directory or a file") - def write(self, file: Union[pathlib.Path, str], arcname: Optional[str] = None): + def write(self, file: Union[pathlib.Path, str], arcname: Optional[str] = None, dereference: bool = False): """Write single target file into archive.""" if isinstance(file, str): path = pathlib.Path(file) @@ -1029,11 +1035,11 @@ def write(self, file: Union[pathlib.Path, str], arcname: Optional[str] = None): else: raise ValueError("Unsupported file type.") folder = self.header.initialize() - file_info = self._make_file_info(path, arcname, self.dereference) + file_info = self._make_file_info(path, arcname, dereference or self.dereference) self.header.files_info.files.append(file_info) self.header.files_info.emptyfiles.append(file_info["emptystream"]) self.files.append(file_info) - self.worker.archive(self.fp, self.files, folder, deref=self.dereference) + self.worker.archive(self.fp, self.files, folder, deref=dereference or self.dereference) def writed(self, targets: Dict[str, IO[Any]]) -> None: for target, input in targets.items():