Skip to content

Commit 92aff0b

Browse files
[Python] Support DLF OSS endpoint override in RESTTokenFileIO (#6749)
1 parent 044bf09 commit 92aff0b

File tree

3 files changed

+92
-1
lines changed

3 files changed

+92
-1
lines changed

paimon-python/pypaimon/catalog/rest/rest_token_file_io.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
from pypaimon.api.rest_api import RESTApi
2626
from pypaimon.catalog.rest.rest_token import RESTToken
27+
from pypaimon.common.config import CatalogOptions, OssOptions
2728
from pypaimon.common.file_io import FileIO
2829
from pypaimon.common.identifier import Identifier
2930

@@ -57,9 +58,18 @@ def __setstate__(self, state):
5758

5859
def _initialize_oss_fs(self, path) -> FileSystem:
5960
self.try_to_refresh_token()
60-
self.properties.update(self.token.token)
61+
merged_token = self._merge_token_with_catalog_options(self.token.token)
62+
self.properties.update(merged_token)
6163
return super()._initialize_oss_fs(path)
6264

65+
def _merge_token_with_catalog_options(self, token: dict) -> dict:
66+
"""Merge token with catalog options, DLF OSS endpoint should override the standard OSS endpoint."""
67+
merged_token = dict(token)
68+
dlf_oss_endpoint = self.properties.get(CatalogOptions.DLF_OSS_ENDPOINT)
69+
if dlf_oss_endpoint and dlf_oss_endpoint.strip():
70+
merged_token[OssOptions.OSS_ENDPOINT] = dlf_oss_endpoint
71+
return merged_token
72+
6373
def new_output_stream(self, path: str):
6474
# Call parent class method to ensure path conversion and parent directory creation
6575
return super().new_output_stream(path)

paimon-python/pypaimon/common/config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ class CatalogOptions:
4545
DLF_TOKEN_LOADER = "dlf.token-loader"
4646
DLF_TOKEN_ECS_ROLE_NAME = "dlf.token-ecs-role-name"
4747
DLF_TOKEN_ECS_METADATA_URL = "dlf.token-ecs-metadata-url"
48+
DLF_OSS_ENDPOINT = "dlf.oss-endpoint"
4849
PREFIX = 'prefix'
4950
HTTP_USER_AGENT_HEADER = 'header.HTTP_USER_AGENT'
5051
BLOB_FILE_IO_DEFAULT_CACHE_SIZE = 2**31 - 1

paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,13 @@
1616
# limitations under the License.
1717
################################################################################
1818
import os
19+
import pickle
1920
import tempfile
2021
import unittest
2122
from unittest.mock import patch
2223

2324
from pypaimon.catalog.rest.rest_token_file_io import RESTTokenFileIO
25+
from pypaimon.common.config import CatalogOptions, OssOptions
2426
from pypaimon.common.file_io import FileIO
2527
from pypaimon.common.identifier import Identifier
2628

@@ -110,6 +112,84 @@ def test_new_output_stream_behavior_matches_parent(self):
110112
read_content = stream.read()
111113
self.assertEqual(read_content, test_content)
112114

115+
def test_pickle_serialization(self):
116+
with patch.object(RESTTokenFileIO, 'try_to_refresh_token'):
117+
original_file_io = RESTTokenFileIO(
118+
self.identifier,
119+
self.warehouse_path,
120+
self.catalog_options
121+
)
122+
123+
self.assertTrue(hasattr(original_file_io, 'lock'))
124+
self.assertIsNotNone(original_file_io.lock)
125+
126+
pickled = pickle.dumps(original_file_io)
127+
128+
deserialized_file_io = pickle.loads(pickled)
129+
130+
self.assertEqual(deserialized_file_io.identifier, original_file_io.identifier)
131+
self.assertEqual(deserialized_file_io.path, original_file_io.path)
132+
self.assertEqual(deserialized_file_io.properties, original_file_io.properties)
133+
134+
self.assertTrue(hasattr(deserialized_file_io, 'lock'))
135+
self.assertIsNotNone(deserialized_file_io.lock)
136+
self.assertIsNot(deserialized_file_io.lock, original_file_io.lock)
137+
138+
self.assertIsNone(deserialized_file_io.api_instance)
139+
140+
test_file_path = f"file://{self.temp_dir}/pickle_test.txt"
141+
test_content = b"pickle test content"
142+
143+
with deserialized_file_io.new_output_stream(test_file_path) as stream:
144+
stream.write(test_content)
145+
146+
expected_path = f"{self.temp_dir}/pickle_test.txt"
147+
self.assertTrue(os.path.exists(expected_path))
148+
with open(expected_path, 'rb') as f:
149+
self.assertEqual(f.read(), test_content)
150+
151+
def test_dlf_oss_endpoint_overrides_token_endpoint(self):
152+
"""Test that DLF OSS endpoint overrides the standard OSS endpoint in token."""
153+
dlf_oss_endpoint = "https://dlf-custom-endpoint.oss-cn-hangzhou.aliyuncs.com"
154+
catalog_options = {
155+
CatalogOptions.DLF_OSS_ENDPOINT: dlf_oss_endpoint
156+
}
157+
158+
with patch.object(RESTTokenFileIO, 'try_to_refresh_token'):
159+
file_io = RESTTokenFileIO(
160+
self.identifier,
161+
self.warehouse_path,
162+
catalog_options
163+
)
164+
165+
# Create a token with a standard OSS endpoint
166+
token = {
167+
OssOptions.OSS_ENDPOINT: "https://standard-endpoint.oss-cn-beijing.aliyuncs.com",
168+
"fs.oss.accessKeyId": "test-access-key",
169+
"fs.oss.accessKeySecret": "test-secret-key"
170+
}
171+
172+
# Merge token with catalog options
173+
merged_token = file_io._merge_token_with_catalog_options(token)
174+
175+
# Verify DLF OSS endpoint overrides the standard endpoint
176+
self.assertEqual(
177+
merged_token[OssOptions.OSS_ENDPOINT],
178+
dlf_oss_endpoint,
179+
"DLF OSS endpoint should override the standard OSS endpoint in token"
180+
)
181+
# Verify other token properties are preserved
182+
self.assertEqual(
183+
merged_token["fs.oss.accessKeyId"],
184+
"test-access-key",
185+
"Other token properties should be preserved"
186+
)
187+
self.assertEqual(
188+
merged_token["fs.oss.accessKeySecret"],
189+
"test-secret-key",
190+
"Other token properties should be preserved"
191+
)
192+
113193

114194
if __name__ == '__main__':
115195
unittest.main()

0 commit comments

Comments
 (0)