|
1 | 1 | import threading |
2 | | -from typing import Iterator, List, Optional, Union |
| 2 | +from typing import Iterator, List, Optional, Tuple, Union |
| 3 | +from urllib.parse import urlsplit, urlunsplit |
3 | 4 |
|
4 | 5 | # pylint:disable=abstract-method |
5 | 6 | from dvc.utils.objects import cached_property |
6 | 7 | from dvc_objects.fs.base import AnyFSPath, ObjectFileSystem |
7 | 8 | from funcy import wrap_prop |
8 | 9 |
|
9 | | -from .path import GSPath |
10 | | - |
11 | 10 |
|
12 | 11 | class GSFileSystem(ObjectFileSystem): |
13 | 12 | protocol = "gs" |
14 | 13 | REQUIRES = {"gcsfs": "gcsfs"} |
15 | 14 | PARAM_CHECKSUM = "etag" |
16 | 15 |
|
17 | | - @cached_property |
18 | | - def path(self) -> GSPath: |
19 | | - def _getcwd(): |
20 | | - return self.fs.root_marker |
| 16 | + def getcwd(self): |
| 17 | + return self.fs.root_marker |
21 | 18 |
|
22 | | - return GSPath(self.sep, getcwd=_getcwd) |
| 19 | + @classmethod |
| 20 | + def split_version(cls, path: AnyFSPath) -> Tuple[str, Optional[str]]: |
| 21 | + from gcsfs import GCSFileSystem |
| 22 | + |
| 23 | + parts = list(urlsplit(path)) |
| 24 | + # NOTE: we use urlsplit/unsplit here to strip scheme before calling |
| 25 | + # GCSFileSystem._split_path, otherwise it will consider DVC |
| 26 | + # remote:// protocol to be a bucket named "remote:" |
| 27 | + scheme = parts[0] |
| 28 | + parts[0] = "" |
| 29 | + path = urlunsplit(parts) |
| 30 | + parts = GCSFileSystem._split_path( # pylint: disable=protected-access |
| 31 | + path, version_aware=True |
| 32 | + ) |
| 33 | + bucket, key, generation = parts |
| 34 | + scheme = f"{scheme}://" if scheme else "" |
| 35 | + return f"{scheme}{bucket}/{key}", generation |
| 36 | + |
| 37 | + @classmethod |
| 38 | + def join_version(cls, path: AnyFSPath, version_id: Optional[str]) -> str: |
| 39 | + path, path_version = cls.split_version(path) |
| 40 | + if path_version: |
| 41 | + raise ValueError("path already includes an object generation") |
| 42 | + return f"{path}#{version_id}" if version_id else path |
| 43 | + |
| 44 | + @classmethod |
| 45 | + def version_path(cls, path: AnyFSPath, version_id: Optional[str]) -> str: |
| 46 | + path, _ = cls.split_version(path) |
| 47 | + return cls.join_version(path, version_id) |
| 48 | + |
| 49 | + @classmethod |
| 50 | + def coalesce_version( |
| 51 | + cls, path: AnyFSPath, version_id: Optional[str] |
| 52 | + ) -> Tuple[AnyFSPath, Optional[str]]: |
| 53 | + path, path_version_id = cls.split_version(path) |
| 54 | + versions = {ver for ver in (version_id, path_version_id) if ver} |
| 55 | + if len(versions) > 1: |
| 56 | + raise ValueError( |
| 57 | + f"Path version mismatch: '{path}', '{version_id}'" |
| 58 | + ) |
| 59 | + return path, (versions.pop() if versions else None) |
23 | 60 |
|
24 | 61 | def _prepare_credentials(self, **config): |
25 | 62 | login_info = {"consistency": None} |
@@ -69,8 +106,8 @@ def find( |
69 | 106 | ) -> Iterator[str]: |
70 | 107 | def _add_dir_sep(path: str) -> str: |
71 | 108 | # NOTE: gcsfs expects explicit trailing slash for dir find() |
72 | | - if self.isdir(path) and not path.endswith(self.path.flavour.sep): |
73 | | - return path + self.path.flavour.sep |
| 109 | + if self.isdir(path) and not path.endswith(self.sep): |
| 110 | + return path + self.sep |
74 | 111 | return path |
75 | 112 |
|
76 | 113 | if not prefix: |
|
0 commit comments