Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

creation_time regression between 1.11 and 1.14 #296

Open
Laurent-Van-Miegroet opened this issue Oct 22, 2024 · 1 comment
Open

creation_time regression between 1.11 and 1.14 #296

Laurent-Van-Miegroet opened this issue Oct 22, 2024 · 1 comment

Comments

@Laurent-Van-Miegroet
Copy link

Laurent-Van-Miegroet commented Oct 22, 2024

Hi,

Using the below code:

from smbclient import ClientConfig, register_session, reset_connection_cache, scandir


class FileEntry(object):
    def __init__(self, path: str, file_directory_info: Dict[str, Any]):
        self.name = file_directory_info["file_name"].value.decode("utf-16-le")
        self.path = path
        self.ctime = file_directory_info["creation_time"].value
        self.atime = file_directory_info["last_access_time"].value
        self.wtime = file_directory_info["last_write_time"].value
        self.size = file_directory_info["allocation_size"].value
        self.attributes = file_directory_info["file_attributes"].value
        self.is_archive = self._flag_set(FileAttributes.FILE_ATTRIBUTE_ARCHIVE)
        self.is_compressed = self._flag_set(FileAttributes.FILE_ATTRIBUTE_COMPRESSED)
        self.is_directory = self._flag_set(FileAttributes.FILE_ATTRIBUTE_DIRECTORY)
        self.is_hidden = self._flag_set(FileAttributes.FILE_ATTRIBUTE_HIDDEN)
        self.is_normal = self._flag_set(FileAttributes.FILE_ATTRIBUTE_NORMAL)
        self.is_readonly = self._flag_set(FileAttributes.FILE_ATTRIBUTE_READONLY)
        self.is_reparse_point = self._flag_set(FileAttributes.FILE_ATTRIBUTE_REPARSE_POINT)
        self.is_system = self._flag_set(FileAttributes.FILE_ATTRIBUTE_SYSTEM)
        self.is_temporary = self._flag_set(FileAttributes.FILE_ATTRIBUTE_TEMPORARY)

        """
                self.name = file_directory_info.file_name
                self.path = path
                self.ctime = file_directory_info.creation_time
                self.atime = file_directory_info.last_access_time
                self.wtime = file_directory_info.last_write_time
                self.size = file_directory_info.allocation_size
                self.attributes = file_directory_info.file_attributes
                self.is_archive = self._flag_set(FileAttributes.FILE_ATTRIBUTE_ARCHIVE)
                self.is_compressed = self._flag_set(FileAttributes.FILE_ATTRIBUTE_COMPRESSED)
                self.is_directory = self._flag_set(FileAttributes.FILE_ATTRIBUTE_DIRECTORY)
                self.is_hidden = self._flag_set(FileAttributes.FILE_ATTRIBUTE_HIDDEN)
                self.is_normal = self._flag_set(FileAttributes.FILE_ATTRIBUTE_NORMAL)
                self.is_readonly = self._flag_set(FileAttributes.FILE_ATTRIBUTE_READONLY)
                self.is_reparse_point = self._flag_set(FileAttributes.FILE_ATTRIBUTE_REPARSE_POINT)
                self.is_system = self._flag_set(FileAttributes.FILE_ATTRIBUTE_SYSTEM)
                self.is_temporary = self._flag_set(FileAttributes.FILE_ATTRIBUTE_TEMPORARY)
        """
    def __str__(self) -> str:
        return f"{self.name} {self.path} size: {self.size} write_time {self.wtime}"

    def _flag_set(self, attribute) -> bool:
        """

        :param attribute:
        :return:
        """
        return self.attributes & attribute == attribute

def _listdir(
        self, path: str, pattern: str, recurse: bool, exclude_dir: Optional[List[str]] = None
    ) -> List[FileEntry]:
        """
        List the contents of a directory
        :param path:
        :param pattern:
        :param recurse:
        :param exclude_dir:
        :return:
        """
        exclude_dir = [] if exclude_dir is None else exclude_dir
        entries = []
        for file_info in scandir(path, pattern):
            if file_info.is_file():
                fe = FileEntry(file_info.path, file_info._dir_info)
                entries.append(fe)
            elif file_info.is_dir() and recurse:
                skip = False
                for pat in exclude_dir:
                    if fnmatch.fnmatch(file_info.path, pat):
                        skip = True
                        break
                if not skip:
                    entries += self._listdir(file_info.path, pattern, recurse, exclude_dir)
            else:
                self.logger.warning(f"Symlink {file_info.name} is ignored.")
        return entries


def listdir(self, path: str, recurse: bool = False, exclude_dir: Optional[List[str]] = None) -> List[FileEntry]:
        """
        List the files and folders in an SMB path and its attributes.

        :param path: The full SMB share to list, this can be <backslash><backslash>server\share with an optional path added to
                        the end, <backslash><backslash>server\ can be omitted
        :param recurse: Whether to search recursively or just the top level.
        :param exclude_dir: A list of glob like pattern to exclude directories
        :return: A list of FileEntry objects
        """  # noqa W605

        # remove server if given:
        exclude_dir = [] if exclude_dir is None else exclude_dir
        if path.startswith(f"\\{self.server}"):
            path = path.replace(f"\\{self.server}", "")

        path_split = [e for e in path.split("\\") if e]

        if len(path_split) < 2:
            raise Exception("Path should specify at least the server and share to connect to.")

        share = path_split[0]
        share_path = "\\".join(path_split[1:])  # if len(path_split) > 2 else ""

        tree_path = r"\\%s\%s\%s" % (self.server, share, share_path)
        self.logger.info(f"Scanning {tree_path}")
        ret = []
        try:
            ret = self._listdir(tree_path, "*", recurse, exclude_dir=exclude_dir)
        except Exception as e:
            raise e

        return ret

files = listdir(rf"{server_path}", recurse=True, exclude_dir=exclude_dir)
for f in files:
    file_utc = f.wtime.astimezone(pytz.UTC).replace(tzinfo=None)
    print("{f.name} modified on {file_utc }")

I get:

Using python 3.9, smbprotocol 1.11.0 I get this

FTV;0155.88.cdcs.gp3.xml modified on 2024-10-21 09:25:19.848814

Using python 3.9, smbprotocal 1.14.0 I get this:

FTV;0155.88.cdcs.gp3.xml modified on 2024-10-21 11:25:19.848814

There is a 2h difference that sounds like a UTC issue. I'm in Paris timezone, and file modification date on windows explorer is 1:25PM. So it seems version 1.14.0 is the correct one but what is the change in the code that trigger this difference?

@jborean93
Copy link
Owner

Most likely the change was from https://github.com/jborean93/smbprotocol/pull/252/files where the datetime object went from a naive DT to one based on the UTC timezone which is what a FILETIME value is.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants