|
| 1 | +from typing import Optional, Tuple |
| 2 | + |
| 3 | +from dvc_objects.fs.base import AnyFSPath |
| 4 | +from dvc_objects.fs.path import Path |
| 5 | + |
| 6 | + |
| 7 | +class GSPath(Path): |
| 8 | + def split_version(self, path: AnyFSPath) -> Tuple[str, Optional[str]]: |
| 9 | + from gcsfs import GCSFileSystem |
| 10 | + |
| 11 | + parts = GCSFileSystem._split_path( # pylint: disable=protected-access |
| 12 | + path, version_aware=True |
| 13 | + ) |
| 14 | + bucket, key, generation = parts |
| 15 | + return f"gs://{bucket}/{key}", generation |
| 16 | + |
| 17 | + def join_version(self, path: AnyFSPath, version_id: Optional[str]) -> str: |
| 18 | + path, path_version = self.split_version(path) |
| 19 | + if path_version: |
| 20 | + raise ValueError("path already includes an object generation") |
| 21 | + return f"{path}#{version_id}" |
| 22 | + |
| 23 | + def version_path(self, path: AnyFSPath, version_id: Optional[str]) -> str: |
| 24 | + path, _ = self.split_version(path) |
| 25 | + return self.join_version(path, version_id) |
| 26 | + |
| 27 | + def coalesce_version( |
| 28 | + self, path: AnyFSPath, version_id: Optional[str] |
| 29 | + ) -> Tuple[AnyFSPath, Optional[str]]: |
| 30 | + path, path_version_id = self.split_version(path) |
| 31 | + versions = {ver for ver in (version_id, path_version_id) if ver} |
| 32 | + if len(versions) > 1: |
| 33 | + raise ValueError( |
| 34 | + f"Path version mismatch: '{path}', '{version_id}'" |
| 35 | + ) |
| 36 | + return path, (versions.pop() if versions else None) |
0 commit comments