Skip to content

Commit 274d578

Browse files
committed
add gcsfs.GCSFileSystem with patched find method
Workaround until fsspec/gcsfs#488 is merged
1 parent 3250c38 commit 274d578

File tree

2 files changed

+72
-1
lines changed

2 files changed

+72
-1
lines changed

dvc_gs/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ def _prepare_credentials(self, **config):
2121
@wrap_prop(threading.Lock())
2222
@cached_property
2323
def fs(self):
24-
from gcsfs import GCSFileSystem
24+
# TODO: Use `gcsfs` when https://github.com/fsspec/gcsfs/pull/488
25+
# is merged and its version bumped
26+
from .gcsfs import GCSFileSystem
2527

2628
return GCSFileSystem(**self.fs_args)
2729

dvc_gs/gcsfs.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
# pylint: disable=abstract-method
2+
# TODO: remove this module when https://github.com/fsspec/gcsfs/pull/488
3+
# is merged and version is bumped
4+
5+
from gcsfs import GCSFileSystem as GCSFileSystem_
6+
7+
8+
class GCSFileSystem(GCSFileSystem_):
9+
def __init__(self, *args, **kwargs):
10+
super().__init__(*args, **kwargs)
11+
12+
async def _find(
13+
self, path, withdirs=False, detail=False, prefix="", **kwargs
14+
):
15+
path = self._strip_protocol(path)
16+
bucket, key = self.split_path(path)
17+
18+
if prefix:
19+
_path = "" if not key else key.rstrip("/") + "/"
20+
_prefix = f"{_path}{prefix}"
21+
else:
22+
_prefix = key
23+
24+
objects, _ = await self._do_list_objects(
25+
bucket, delimiter="", prefix=_prefix
26+
)
27+
28+
dirs = {}
29+
cache_entries = {}
30+
31+
for obj in objects:
32+
parent = self._parent(obj["name"])
33+
previous = obj
34+
35+
while parent:
36+
dir_key = self.split_path(parent)[1]
37+
if not dir_key:
38+
break
39+
40+
dirs[parent] = {
41+
"Key": dir_key,
42+
"Size": 0,
43+
"name": parent,
44+
"StorageClass": "DIRECTORY",
45+
"type": "directory",
46+
"size": 0,
47+
}
48+
49+
if len(parent) < len(path):
50+
# don't go above the requested level
51+
break
52+
53+
cache_entries.setdefault(parent, []).append(previous)
54+
55+
previous = dirs[parent]
56+
parent = self._parent(parent)
57+
58+
if not prefix:
59+
self.dircache.update(cache_entries)
60+
61+
if withdirs:
62+
objects = sorted(
63+
objects + list(dirs.values()), key=lambda x: x["name"]
64+
)
65+
66+
if detail:
67+
return {o["name"]: o for o in objects}
68+
69+
return [o["name"] for o in objects]

0 commit comments

Comments
 (0)