diff --git a/CHANGELOG.md b/CHANGELOG.md index 4dfe9058..0b80ff81 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/). arguments for the `write` and `writelines` methods, as expected by their base class [`io.RawIOBase`](https://docs.python.org/3/library/io.html#io.RawIOBase). - Various documentation issues, including `MemoryFS` docstring not rendering properly. - Avoid creating a new connection on every call of `FTPFS.upload`. Closes [#455](https://github.com/PyFilesystem/pyfilesystem2/issues/455). +- `WrapReadOnly.removetree` not raising a `ResourceReadOnly` when called. Closes [#468](https://github.com/PyFilesystem/pyfilesystem2/issues/468). +- `WrapCachedDir.isdir` and `WrapCachedDir.isfile` raising a `ResourceNotFound` error on non-existing path ([#470](https://github.com/PyFilesystem/pyfilesystem2/pull/470)). ## [2.4.12] - 2021-01-14 diff --git a/fs/wrap.py b/fs/wrap.py index 8685e9f6..3ae4aa9f 100644 --- a/fs/wrap.py +++ b/fs/wrap.py @@ -92,6 +92,22 @@ class WrapCachedDir(WrapFS[_F], typing.Generic[_F]): """ + # FIXME (@althonos): The caching data structure can very likely be + # improved. With the current implementation, if `scandir` result was + # cached for `namespaces=["details", "access"]`, calling `scandir` + # again only with `names=["details"]` will miss the cache, even though + # we are already storing the totality of the required metadata. + # + # A possible solution would be to replaced the cached with a + # Dict[Text, Dict[Text, Dict[Text, Info]]] + # ^ ^ ^ ^-- the actual info object + # | | \-- the path of the directory entry + # | \-- the namespace of the info + # \-- the cached directory entry + # + # Furthermore, `listdir` and `filterdir` calls should be cached as well, + # since they can be written as wrappers of `scandir`. + wrap_name = "cached-dir" def __init__(self, wrap_fs): # noqa: D107 @@ -135,13 +151,17 @@ def getinfo(self, path, namespaces=None): def isdir(self, path): # type: (Text) -> bool - # FIXME(@althonos): this raises an error on non-existing file ! - return self.getinfo(path).is_dir + try: + return self.getinfo(path).is_dir + except ResourceNotFound: + return False def isfile(self, path): # type: (Text) -> bool - # FIXME(@althonos): this raises an error on non-existing file ! - return not self.getinfo(path).is_dir + try: + return not self.getinfo(path).is_dir + except ResourceNotFound: + return False class WrapReadOnly(WrapFS[_F], typing.Generic[_F]): @@ -203,6 +223,11 @@ def removedir(self, path): self.check() raise ResourceReadOnly(path) + def removetree(self, path): + # type: (Text) -> None + self.check() + raise ResourceReadOnly(path) + def setinfo(self, path, info): # type: (Text, RawInfo) -> None self.check() diff --git a/tests/test_wrap.py b/tests/test_wrap.py index 4d5cc8c4..89a91187 100644 --- a/tests/test_wrap.py +++ b/tests/test_wrap.py @@ -1,97 +1,219 @@ from __future__ import unicode_literals +import operator import unittest -from fs import errors +try: + from unittest import mock +except ImportError: + import mock + +import six + +import fs.copy +import fs.mirror +import fs.move +import fs.wrap +import fs.errors from fs import open_fs -from fs import wrap +from fs.info import Info -class TestWrap(unittest.TestCase): - def test_readonly(self): - mem_fs = open_fs("mem://") - fs = wrap.read_only(mem_fs) +class TestWrapReadOnly(unittest.TestCase): + def setUp(self): + self.fs = open_fs("mem://") + self.ro = fs.wrap.read_only(self.fs) - with self.assertRaises(errors.ResourceReadOnly): - fs.open("foo", "w") + def tearDown(self): + self.fs.close() - with self.assertRaises(errors.ResourceReadOnly): - fs.appendtext("foo", "bar") + def assertReadOnly(self, func, *args, **kwargs): + self.assertRaises(fs.errors.ResourceReadOnly, func, *args, **kwargs) - with self.assertRaises(errors.ResourceReadOnly): - fs.appendbytes("foo", b"bar") + def test_open_w(self): + self.assertReadOnly(self.ro.open, "foo", "w") - with self.assertRaises(errors.ResourceReadOnly): - fs.makedir("foo") + def test_appendtext(self): + self.assertReadOnly(self.ro.appendtext, "foo", "bar") - with self.assertRaises(errors.ResourceReadOnly): - fs.move("foo", "bar") + def test_appendbytes(self): + self.assertReadOnly(self.ro.appendbytes, "foo", b"bar") - with self.assertRaises(errors.ResourceReadOnly): - fs.openbin("foo", "w") + def test_makedir(self): + self.assertReadOnly(self.ro.makedir, "foo") - with self.assertRaises(errors.ResourceReadOnly): - fs.remove("foo") + def test_move(self): + self.assertReadOnly(self.ro.move, "foo", "bar") - with self.assertRaises(errors.ResourceReadOnly): - fs.removedir("foo") + def test_openbin_w(self): + self.assertReadOnly(self.ro.openbin, "foo", "w") - with self.assertRaises(errors.ResourceReadOnly): - fs.setinfo("foo", {}) + def test_remove(self): + self.assertReadOnly(self.ro.remove, "foo") - with self.assertRaises(errors.ResourceReadOnly): - fs.settimes("foo", {}) + def test_removedir(self): + self.assertReadOnly(self.ro.removedir, "foo") - with self.assertRaises(errors.ResourceReadOnly): - fs.copy("foo", "bar") + def test_removetree(self): + self.assertReadOnly(self.ro.removetree, "foo") - with self.assertRaises(errors.ResourceReadOnly): - fs.create("foo") + def test_setinfo(self): + self.assertReadOnly(self.ro.setinfo, "foo", {}) - with self.assertRaises(errors.ResourceReadOnly): - fs.writetext("foo", "bar") + def test_settimes(self): + self.assertReadOnly(self.ro.settimes, "foo", {}) - with self.assertRaises(errors.ResourceReadOnly): - fs.writebytes("foo", b"bar") + def test_copy(self): + self.assertReadOnly(self.ro.copy, "foo", "bar") - with self.assertRaises(errors.ResourceReadOnly): - fs.makedirs("foo/bar") + def test_create(self): + self.assertReadOnly(self.ro.create, "foo") - with self.assertRaises(errors.ResourceReadOnly): - fs.touch("foo") + def test_writetext(self): + self.assertReadOnly(self.ro.writetext, "foo", "bar") - with self.assertRaises(errors.ResourceReadOnly): - fs.upload("foo", None) + def test_writebytes(self): + self.assertReadOnly(self.ro.writebytes, "foo", b"bar") - with self.assertRaises(errors.ResourceReadOnly): - fs.writefile("foo", None) + def test_makedirs(self): + self.assertReadOnly(self.ro.makedirs, "foo/bar") - self.assertTrue(mem_fs.isempty("/")) - mem_fs.writebytes("file", b"read me") - with fs.openbin("file") as read_file: - self.assertEqual(read_file.read(), b"read me") + def test_touch(self): + self.assertReadOnly(self.ro.touch, "foo") - with fs.open("file", "rb") as read_file: - self.assertEqual(read_file.read(), b"read me") + def test_upload(self): + self.assertReadOnly(self.ro.upload, "foo", six.BytesIO()) - def test_cachedir(self): - mem_fs = open_fs("mem://") - mem_fs.makedirs("foo/bar/baz") - mem_fs.touch("egg") + def test_writefile(self): + self.assertReadOnly(self.ro.writefile, "foo", six.StringIO()) - fs = wrap.cache_directory(mem_fs) - self.assertEqual(sorted(fs.listdir("/")), ["egg", "foo"]) - self.assertEqual(sorted(fs.listdir("/")), ["egg", "foo"]) - self.assertTrue(fs.isdir("foo")) - self.assertTrue(fs.isdir("foo")) - self.assertTrue(fs.isfile("egg")) - self.assertTrue(fs.isfile("egg")) + def test_openbin_r(self): + self.fs.writebytes("file", b"read me") + with self.ro.openbin("file") as read_file: + self.assertEqual(read_file.read(), b"read me") - self.assertEqual(fs.getinfo("foo"), mem_fs.getinfo("foo")) - self.assertEqual(fs.getinfo("foo"), mem_fs.getinfo("foo")) + def test_open_r(self): + self.fs.writebytes("file", b"read me") + with self.ro.open("file", "rb") as read_file: + self.assertEqual(read_file.read(), b"read me") - self.assertEqual(fs.getinfo("/"), mem_fs.getinfo("/")) - self.assertEqual(fs.getinfo("/"), mem_fs.getinfo("/")) - with self.assertRaises(errors.ResourceNotFound): - fs.getinfo("/foofoo") +class TestWrapReadOnlySyspath(unittest.TestCase): + # If the wrapped fs has a syspath, there is a chance that somewhere + # in fs.copy or fs.mirror we try to use it to our advantage, but + # we want to make sure these implementations don't circumvent the + # wrapper. + + def setUp(self): + self.fs = open_fs("temp://") + self.ro = fs.wrap.read_only(self.fs) + self.src = open_fs("temp://") + self.src.touch("foo") + self.src.makedir("bar") + + def tearDown(self): + self.fs.close() + self.src.close() + + def assertReadOnly(self, func, *args, **kwargs): + self.assertRaises(fs.errors.ResourceReadOnly, func, *args, **kwargs) + + def test_copy_fs(self): + self.assertReadOnly(fs.copy.copy_fs, self.src, self.ro) + + def test_copy_fs_if_newer(self): + self.assertReadOnly(fs.copy.copy_fs_if_newer, self.src, self.ro) + + def test_copy_file(self): + self.assertReadOnly(fs.copy.copy_file, self.src, "foo", self.ro, "foo") + + def test_copy_file_if_newer(self): + self.assertReadOnly(fs.copy.copy_file_if_newer, self.src, "foo", self.ro, "foo") + + def test_copy_structure(self): + self.assertReadOnly(fs.copy.copy_structure, self.src, self.ro) + + def test_mirror(self): + self.assertReadOnly(fs.mirror.mirror, self.src, self.ro) + fs.mirror.mirror(self.src, self.fs) + self.fs.touch("baz") + self.assertReadOnly(fs.mirror.mirror, self.src, self.ro) + + def test_move_fs(self): + self.assertReadOnly(fs.move.move_fs, self.src, self.ro) + self.src.removetree("/") + self.fs.touch("foo") + self.assertReadOnly(fs.move.move_fs, self.ro, self.src) + + def test_move_file(self): + self.assertReadOnly(fs.move.move_file, self.src, "foo", self.ro, "foo") + self.fs.touch("baz") + self.assertReadOnly(fs.move.move_file, self.ro, "baz", self.src, "foo") + + def test_move_dir(self): + self.assertReadOnly(fs.move.move_file, self.src, "bar", self.ro, "bar") + self.fs.makedir("baz") + self.assertReadOnly(fs.move.move_dir, self.ro, "baz", self.src, "baz") + + +class TestWrapCachedDir(unittest.TestCase): + def setUp(self): + self.fs = open_fs("mem://") + self.fs.makedirs("foo/bar/baz") + self.fs.touch("egg") + self.cached = fs.wrap.cache_directory(self.fs) + + def tearDown(self): + self.fs.close() + + def assertNotFound(self, func, *args, **kwargs): + self.assertRaises(fs.errors.ResourceNotFound, func, *args, **kwargs) + + def test_scandir(self): + key = operator.attrgetter("name") + expected = [ + Info({"basic": {"name": "egg", "is_dir": False}}), + Info({"basic": {"name": "foo", "is_dir": True}}), + ] + with mock.patch.object(self.fs, "scandir", wraps=self.fs.scandir) as scandir: + self.assertEqual(sorted(self.cached.scandir("/"), key=key), expected) + scandir.assert_has_calls([mock.call('/', namespaces=None, page=None)]) + with mock.patch.object(self.fs, "scandir", wraps=self.fs.scandir) as scandir: + self.assertEqual(sorted(self.cached.scandir("/"), key=key), expected) + scandir.assert_not_called() + + def test_isdir(self): + with mock.patch.object(self.fs, "scandir", wraps=self.fs.scandir) as scandir: + self.assertTrue(self.cached.isdir("foo")) + self.assertFalse(self.cached.isdir("egg")) # is file + self.assertFalse(self.cached.isdir("spam")) # doesn't exist + scandir.assert_has_calls([mock.call('/', namespaces=None, page=None)]) + with mock.patch.object(self.fs, "scandir", wraps=self.fs.scandir) as scandir: + self.assertTrue(self.cached.isdir("foo")) + self.assertFalse(self.cached.isdir("egg")) + self.assertFalse(self.cached.isdir("spam")) + scandir.assert_not_called() + + def test_isfile(self): + with mock.patch.object(self.fs, "scandir", wraps=self.fs.scandir) as scandir: + self.assertTrue(self.cached.isfile("egg")) + self.assertFalse(self.cached.isfile("foo")) # is dir + self.assertFalse(self.cached.isfile("spam")) # doesn't exist + scandir.assert_has_calls([mock.call('/', namespaces=None, page=None)]) + with mock.patch.object(self.fs, "scandir", wraps=self.fs.scandir) as scandir: + self.assertTrue(self.cached.isfile("egg")) + self.assertFalse(self.cached.isfile("foo")) + self.assertFalse(self.cached.isfile("spam")) + scandir.assert_not_called() + + def test_getinfo(self): + with mock.patch.object(self.fs, "scandir", wraps=self.fs.scandir) as scandir: + self.assertEqual(self.cached.getinfo("foo"), self.fs.getinfo("foo")) + self.assertEqual(self.cached.getinfo("/"), self.fs.getinfo("/")) + self.assertNotFound(self.cached.getinfo, "spam") + scandir.assert_has_calls([mock.call('/', namespaces=None, page=None)]) + with mock.patch.object(self.fs, "scandir", wraps=self.fs.scandir) as scandir: + self.assertEqual(self.cached.getinfo("foo"), self.fs.getinfo("foo")) + self.assertEqual(self.cached.getinfo("/"), self.fs.getinfo("/")) + self.assertNotFound(self.cached.getinfo, "spam") + scandir.assert_not_called()