Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix miscellaneous bugs in fs.wrap #470

Merged
merged 6 commits into from
Mar 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
arguments for the `write` and `writelines` methods, as expected by their base class [`io.RawIOBase`](https://docs.python.org/3/library/io.html#io.RawIOBase).
- Various documentation issues, including `MemoryFS` docstring not rendering properly.
- Avoid creating a new connection on every call of `FTPFS.upload`. Closes [#455](https://github.com/PyFilesystem/pyfilesystem2/issues/455).
- `WrapReadOnly.removetree` not raising a `ResourceReadOnly` when called. Closes [#468](https://github.com/PyFilesystem/pyfilesystem2/issues/468).
- `WrapCachedDir.isdir` and `WrapCachedDir.isfile` raising a `ResourceNotFound` error on non-existing path ([#470](https://github.com/PyFilesystem/pyfilesystem2/pull/470)).


## [2.4.12] - 2021-01-14
Expand Down
33 changes: 29 additions & 4 deletions fs/wrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,22 @@ class WrapCachedDir(WrapFS[_F], typing.Generic[_F]):

"""

# FIXME (@althonos): The caching data structure can very likely be
# improved. With the current implementation, if `scandir` result was
# cached for `namespaces=["details", "access"]`, calling `scandir`
# again only with `names=["details"]` will miss the cache, even though
# we are already storing the totality of the required metadata.
#
# A possible solution would be to replaced the cached with a
# Dict[Text, Dict[Text, Dict[Text, Info]]]
# ^ ^ ^ ^-- the actual info object
# | | \-- the path of the directory entry
# | \-- the namespace of the info
# \-- the cached directory entry
#
# Furthermore, `listdir` and `filterdir` calls should be cached as well,
# since they can be written as wrappers of `scandir`.

wrap_name = "cached-dir"

def __init__(self, wrap_fs): # noqa: D107
Expand Down Expand Up @@ -135,13 +151,17 @@ def getinfo(self, path, namespaces=None):

def isdir(self, path):
# type: (Text) -> bool
# FIXME(@althonos): this raises an error on non-existing file !
return self.getinfo(path).is_dir
try:
return self.getinfo(path).is_dir
except ResourceNotFound:
return False

def isfile(self, path):
# type: (Text) -> bool
# FIXME(@althonos): this raises an error on non-existing file !
return not self.getinfo(path).is_dir
try:
return not self.getinfo(path).is_dir
except ResourceNotFound:
return False


class WrapReadOnly(WrapFS[_F], typing.Generic[_F]):
Expand Down Expand Up @@ -203,6 +223,11 @@ def removedir(self, path):
self.check()
raise ResourceReadOnly(path)

def removetree(self, path):
# type: (Text) -> None
self.check()
raise ResourceReadOnly(path)

def setinfo(self, path, info):
# type: (Text, RawInfo) -> None
self.check()
Expand Down
252 changes: 187 additions & 65 deletions tests/test_wrap.py
Original file line number Diff line number Diff line change
@@ -1,97 +1,219 @@
from __future__ import unicode_literals

import operator
import unittest

from fs import errors
try:
from unittest import mock
except ImportError:
import mock

import six

import fs.copy
import fs.mirror
import fs.move
import fs.wrap
import fs.errors
from fs import open_fs
from fs import wrap
from fs.info import Info


class TestWrap(unittest.TestCase):
def test_readonly(self):
mem_fs = open_fs("mem://")
fs = wrap.read_only(mem_fs)
class TestWrapReadOnly(unittest.TestCase):
def setUp(self):
self.fs = open_fs("mem://")
self.ro = fs.wrap.read_only(self.fs)

with self.assertRaises(errors.ResourceReadOnly):
fs.open("foo", "w")
def tearDown(self):
self.fs.close()

with self.assertRaises(errors.ResourceReadOnly):
fs.appendtext("foo", "bar")
def assertReadOnly(self, func, *args, **kwargs):
self.assertRaises(fs.errors.ResourceReadOnly, func, *args, **kwargs)

with self.assertRaises(errors.ResourceReadOnly):
fs.appendbytes("foo", b"bar")
def test_open_w(self):
self.assertReadOnly(self.ro.open, "foo", "w")

with self.assertRaises(errors.ResourceReadOnly):
fs.makedir("foo")
def test_appendtext(self):
self.assertReadOnly(self.ro.appendtext, "foo", "bar")

with self.assertRaises(errors.ResourceReadOnly):
fs.move("foo", "bar")
def test_appendbytes(self):
self.assertReadOnly(self.ro.appendbytes, "foo", b"bar")

with self.assertRaises(errors.ResourceReadOnly):
fs.openbin("foo", "w")
def test_makedir(self):
self.assertReadOnly(self.ro.makedir, "foo")

with self.assertRaises(errors.ResourceReadOnly):
fs.remove("foo")
def test_move(self):
self.assertReadOnly(self.ro.move, "foo", "bar")

with self.assertRaises(errors.ResourceReadOnly):
fs.removedir("foo")
def test_openbin_w(self):
self.assertReadOnly(self.ro.openbin, "foo", "w")

with self.assertRaises(errors.ResourceReadOnly):
fs.setinfo("foo", {})
def test_remove(self):
self.assertReadOnly(self.ro.remove, "foo")

with self.assertRaises(errors.ResourceReadOnly):
fs.settimes("foo", {})
def test_removedir(self):
self.assertReadOnly(self.ro.removedir, "foo")

with self.assertRaises(errors.ResourceReadOnly):
fs.copy("foo", "bar")
def test_removetree(self):
self.assertReadOnly(self.ro.removetree, "foo")

with self.assertRaises(errors.ResourceReadOnly):
fs.create("foo")
def test_setinfo(self):
self.assertReadOnly(self.ro.setinfo, "foo", {})

with self.assertRaises(errors.ResourceReadOnly):
fs.writetext("foo", "bar")
def test_settimes(self):
self.assertReadOnly(self.ro.settimes, "foo", {})

with self.assertRaises(errors.ResourceReadOnly):
fs.writebytes("foo", b"bar")
def test_copy(self):
self.assertReadOnly(self.ro.copy, "foo", "bar")

with self.assertRaises(errors.ResourceReadOnly):
fs.makedirs("foo/bar")
def test_create(self):
self.assertReadOnly(self.ro.create, "foo")

with self.assertRaises(errors.ResourceReadOnly):
fs.touch("foo")
def test_writetext(self):
self.assertReadOnly(self.ro.writetext, "foo", "bar")

with self.assertRaises(errors.ResourceReadOnly):
fs.upload("foo", None)
def test_writebytes(self):
self.assertReadOnly(self.ro.writebytes, "foo", b"bar")

with self.assertRaises(errors.ResourceReadOnly):
fs.writefile("foo", None)
def test_makedirs(self):
self.assertReadOnly(self.ro.makedirs, "foo/bar")

self.assertTrue(mem_fs.isempty("/"))
mem_fs.writebytes("file", b"read me")
with fs.openbin("file") as read_file:
self.assertEqual(read_file.read(), b"read me")
def test_touch(self):
self.assertReadOnly(self.ro.touch, "foo")

with fs.open("file", "rb") as read_file:
self.assertEqual(read_file.read(), b"read me")
def test_upload(self):
self.assertReadOnly(self.ro.upload, "foo", six.BytesIO())

def test_cachedir(self):
mem_fs = open_fs("mem://")
mem_fs.makedirs("foo/bar/baz")
mem_fs.touch("egg")
def test_writefile(self):
self.assertReadOnly(self.ro.writefile, "foo", six.StringIO())

fs = wrap.cache_directory(mem_fs)
self.assertEqual(sorted(fs.listdir("/")), ["egg", "foo"])
self.assertEqual(sorted(fs.listdir("/")), ["egg", "foo"])
self.assertTrue(fs.isdir("foo"))
self.assertTrue(fs.isdir("foo"))
self.assertTrue(fs.isfile("egg"))
self.assertTrue(fs.isfile("egg"))
def test_openbin_r(self):
self.fs.writebytes("file", b"read me")
with self.ro.openbin("file") as read_file:
self.assertEqual(read_file.read(), b"read me")

self.assertEqual(fs.getinfo("foo"), mem_fs.getinfo("foo"))
self.assertEqual(fs.getinfo("foo"), mem_fs.getinfo("foo"))
def test_open_r(self):
self.fs.writebytes("file", b"read me")
with self.ro.open("file", "rb") as read_file:
self.assertEqual(read_file.read(), b"read me")

self.assertEqual(fs.getinfo("/"), mem_fs.getinfo("/"))
self.assertEqual(fs.getinfo("/"), mem_fs.getinfo("/"))

with self.assertRaises(errors.ResourceNotFound):
fs.getinfo("/foofoo")
class TestWrapReadOnlySyspath(unittest.TestCase):
# If the wrapped fs has a syspath, there is a chance that somewhere
# in fs.copy or fs.mirror we try to use it to our advantage, but
# we want to make sure these implementations don't circumvent the
# wrapper.

def setUp(self):
self.fs = open_fs("temp://")
self.ro = fs.wrap.read_only(self.fs)
self.src = open_fs("temp://")
self.src.touch("foo")
self.src.makedir("bar")

def tearDown(self):
self.fs.close()
self.src.close()

def assertReadOnly(self, func, *args, **kwargs):
self.assertRaises(fs.errors.ResourceReadOnly, func, *args, **kwargs)

def test_copy_fs(self):
self.assertReadOnly(fs.copy.copy_fs, self.src, self.ro)

def test_copy_fs_if_newer(self):
self.assertReadOnly(fs.copy.copy_fs_if_newer, self.src, self.ro)

def test_copy_file(self):
self.assertReadOnly(fs.copy.copy_file, self.src, "foo", self.ro, "foo")

def test_copy_file_if_newer(self):
self.assertReadOnly(fs.copy.copy_file_if_newer, self.src, "foo", self.ro, "foo")

def test_copy_structure(self):
self.assertReadOnly(fs.copy.copy_structure, self.src, self.ro)

def test_mirror(self):
self.assertReadOnly(fs.mirror.mirror, self.src, self.ro)
fs.mirror.mirror(self.src, self.fs)
self.fs.touch("baz")
self.assertReadOnly(fs.mirror.mirror, self.src, self.ro)

def test_move_fs(self):
self.assertReadOnly(fs.move.move_fs, self.src, self.ro)
self.src.removetree("/")
self.fs.touch("foo")
self.assertReadOnly(fs.move.move_fs, self.ro, self.src)

def test_move_file(self):
self.assertReadOnly(fs.move.move_file, self.src, "foo", self.ro, "foo")
self.fs.touch("baz")
self.assertReadOnly(fs.move.move_file, self.ro, "baz", self.src, "foo")

def test_move_dir(self):
self.assertReadOnly(fs.move.move_file, self.src, "bar", self.ro, "bar")
self.fs.makedir("baz")
self.assertReadOnly(fs.move.move_dir, self.ro, "baz", self.src, "baz")


class TestWrapCachedDir(unittest.TestCase):
def setUp(self):
self.fs = open_fs("mem://")
self.fs.makedirs("foo/bar/baz")
self.fs.touch("egg")
self.cached = fs.wrap.cache_directory(self.fs)

def tearDown(self):
self.fs.close()

def assertNotFound(self, func, *args, **kwargs):
self.assertRaises(fs.errors.ResourceNotFound, func, *args, **kwargs)

def test_scandir(self):
key = operator.attrgetter("name")
expected = [
Info({"basic": {"name": "egg", "is_dir": False}}),
Info({"basic": {"name": "foo", "is_dir": True}}),
]
with mock.patch.object(self.fs, "scandir", wraps=self.fs.scandir) as scandir:
self.assertEqual(sorted(self.cached.scandir("/"), key=key), expected)
scandir.assert_has_calls([mock.call('/', namespaces=None, page=None)])
with mock.patch.object(self.fs, "scandir", wraps=self.fs.scandir) as scandir:
self.assertEqual(sorted(self.cached.scandir("/"), key=key), expected)
scandir.assert_not_called()

def test_isdir(self):
with mock.patch.object(self.fs, "scandir", wraps=self.fs.scandir) as scandir:
self.assertTrue(self.cached.isdir("foo"))
self.assertFalse(self.cached.isdir("egg")) # is file
self.assertFalse(self.cached.isdir("spam")) # doesn't exist
scandir.assert_has_calls([mock.call('/', namespaces=None, page=None)])
with mock.patch.object(self.fs, "scandir", wraps=self.fs.scandir) as scandir:
self.assertTrue(self.cached.isdir("foo"))
self.assertFalse(self.cached.isdir("egg"))
self.assertFalse(self.cached.isdir("spam"))
scandir.assert_not_called()

def test_isfile(self):
with mock.patch.object(self.fs, "scandir", wraps=self.fs.scandir) as scandir:
self.assertTrue(self.cached.isfile("egg"))
self.assertFalse(self.cached.isfile("foo")) # is dir
self.assertFalse(self.cached.isfile("spam")) # doesn't exist
scandir.assert_has_calls([mock.call('/', namespaces=None, page=None)])
with mock.patch.object(self.fs, "scandir", wraps=self.fs.scandir) as scandir:
self.assertTrue(self.cached.isfile("egg"))
self.assertFalse(self.cached.isfile("foo"))
self.assertFalse(self.cached.isfile("spam"))
scandir.assert_not_called()

def test_getinfo(self):
with mock.patch.object(self.fs, "scandir", wraps=self.fs.scandir) as scandir:
self.assertEqual(self.cached.getinfo("foo"), self.fs.getinfo("foo"))
self.assertEqual(self.cached.getinfo("/"), self.fs.getinfo("/"))
self.assertNotFound(self.cached.getinfo, "spam")
scandir.assert_has_calls([mock.call('/', namespaces=None, page=None)])
with mock.patch.object(self.fs, "scandir", wraps=self.fs.scandir) as scandir:
self.assertEqual(self.cached.getinfo("foo"), self.fs.getinfo("foo"))
self.assertEqual(self.cached.getinfo("/"), self.fs.getinfo("/"))
self.assertNotFound(self.cached.getinfo, "spam")
scandir.assert_not_called()