Skip to content

Commit 2799c01

Browse files
authored
[python] Fix read large volume of blob data (#6701)
1 parent 64e90d6 commit 2799c01

File tree

3 files changed

+258
-10
lines changed

3 files changed

+258
-10
lines changed

paimon-python/pypaimon/read/reader/concat_batch_reader.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from typing import Callable, List, Optional
2121

2222
import pyarrow as pa
23+
import pyarrow.dataset as ds
2324
from pyarrow import RecordBatch
2425

2526
from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
@@ -89,14 +90,15 @@ class MergeAllBatchReader(RecordBatchReader):
8990
into a single batch for processing.
9091
"""
9192

92-
def __init__(self, reader_suppliers: List[Callable]):
93+
def __init__(self, reader_suppliers: List[Callable], batch_size: int = 4096):
9394
self.reader_suppliers = reader_suppliers
9495
self.merged_batch: Optional[RecordBatch] = None
95-
self.batch_created = False
96+
self.reader = None
97+
self._batch_size = batch_size
9698

9799
def read_arrow_batch(self) -> Optional[RecordBatch]:
98-
if self.batch_created:
99-
return None
100+
if self.reader:
101+
return self.reader.read_next_batch()
100102

101103
all_batches = []
102104

@@ -140,10 +142,10 @@ def read_arrow_batch(self) -> Optional[RecordBatch]:
140142
)
141143
else:
142144
self.merged_batch = None
143-
144-
self.batch_created = True
145-
return self.merged_batch
145+
dataset = ds.InMemoryDataset(self.merged_batch)
146+
self.reader = dataset.scanner(batch_size=self._batch_size).to_reader()
147+
return self.reader.read_next_batch()
146148

147149
def close(self) -> None:
148150
self.merged_batch = None
149-
self.batch_created = False
151+
self.reader = None

paimon-python/pypaimon/read/reader/format_blob_reader.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,13 @@
3434
class FormatBlobReader(RecordBatchReader):
3535

3636
def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str],
37-
full_fields: List[DataField], push_down_predicate: Any, blob_as_descriptor: bool):
37+
full_fields: List[DataField], push_down_predicate: Any, blob_as_descriptor: bool,
38+
batch_size: int = 4096):
3839
self._file_io = file_io
3940
self._file_path = file_path
4041
self._push_down_predicate = push_down_predicate
4142
self._blob_as_descriptor = blob_as_descriptor
42-
43+
self._batch_size = batch_size
4344
# Get file size
4445
self._file_size = file_io.get_file_size(file_path)
4546

@@ -92,6 +93,8 @@ def read_arrow_batch(self) -> Optional[RecordBatch]:
9293
pydict_data[field_name].append(blob_data)
9394

9495
records_in_batch += 1
96+
if records_in_batch >= self._batch_size:
97+
break
9598

9699
except StopIteration:
97100
# Stop immediately when StopIteration occurs

paimon-python/pypaimon/tests/blob_table_test.py

Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1163,6 +1163,116 @@ def test_blob_write_read_large_data_end_to_end(self):
11631163
print(
11641164
f"✅ Large blob end-to-end test passed: wrote and read back {len(blob_data)} large blob records correctly") # noqa: E501
11651165

1166+
def test_blob_write_read_large_data_volume(self):
1167+
from pypaimon import Schema
1168+
1169+
# Create schema with blob column
1170+
pa_schema = pa.schema([
1171+
('id', pa.int32()),
1172+
('batch_id', pa.int32()),
1173+
('metadata', pa.string()),
1174+
('large_blob', pa.large_binary()),
1175+
])
1176+
1177+
schema = Schema.from_pyarrow_schema(
1178+
pa_schema,
1179+
options={
1180+
'row-tracking.enabled': 'true',
1181+
'data-evolution.enabled': 'true'
1182+
}
1183+
)
1184+
self.catalog.create_table('test_db.blob_large_data_volume', schema, False)
1185+
table = self.catalog.get_table('test_db.blob_large_data_volume')
1186+
1187+
large_blob_size = 5 * 1024
1188+
blob_pattern = b'LARGE_BLOB_PATTERN_' + b'X' * 1024 # ~1KB pattern
1189+
pattern_size = len(blob_pattern)
1190+
repetitions = large_blob_size // pattern_size
1191+
large_blob_data = blob_pattern * repetitions
1192+
1193+
num_row = 20000
1194+
write_builder = table.new_batch_write_builder()
1195+
writer = write_builder.new_write()
1196+
expected = pa.Table.from_pydict({
1197+
'id': [1] * num_row,
1198+
'batch_id': [11] * num_row,
1199+
'metadata': [f'Large blob batch {11}'] * num_row,
1200+
'large_blob': [i.to_bytes(2, byteorder='little') + large_blob_data for i in range(num_row)]
1201+
}, schema=pa_schema)
1202+
writer.write_arrow(expected)
1203+
1204+
# Commit all data at once
1205+
commit_messages = writer.prepare_commit()
1206+
commit = write_builder.new_commit()
1207+
commit.commit(commit_messages)
1208+
writer.close()
1209+
1210+
# Read data back
1211+
read_builder = table.new_read_builder()
1212+
table_scan = read_builder.new_scan()
1213+
table_read = read_builder.new_read()
1214+
result = table_read.to_arrow(table_scan.plan().splits())
1215+
1216+
# Verify the data
1217+
self.assertEqual(result.num_rows, num_row)
1218+
self.assertEqual(result.num_columns, 4)
1219+
1220+
self.assertEqual(expected, result)
1221+
1222+
def test_blob_write_read_large_data_volume_rolling(self):
1223+
from pypaimon import Schema
1224+
1225+
# Create schema with blob column
1226+
pa_schema = pa.schema([
1227+
('id', pa.int32()),
1228+
('batch_id', pa.int32()),
1229+
('metadata', pa.string()),
1230+
('large_blob', pa.large_binary()),
1231+
])
1232+
1233+
schema = Schema.from_pyarrow_schema(
1234+
pa_schema,
1235+
options={
1236+
'row-tracking.enabled': 'true',
1237+
'data-evolution.enabled': 'true',
1238+
'blob.target-file-size': '21MB'
1239+
}
1240+
)
1241+
self.catalog.create_table('test_db.large_data_volume_rolling', schema, False)
1242+
table = self.catalog.get_table('test_db.large_data_volume_rolling')
1243+
1244+
# Create large blob data
1245+
large_blob_size = 5 * 1024 #
1246+
blob_pattern = b'LARGE_BLOB_PATTERN_' + b'X' * 1024 # ~1KB pattern
1247+
pattern_size = len(blob_pattern)
1248+
repetitions = large_blob_size // pattern_size
1249+
large_blob_data = blob_pattern * repetitions
1250+
1251+
actual_size = len(large_blob_data)
1252+
print(f"Created blob data: {actual_size:,} bytes ({actual_size / (1024 * 1024):.2f} MB)")
1253+
num_row = 20000
1254+
expected = pa.Table.from_pydict({
1255+
'id': [1] * num_row,
1256+
'batch_id': [11] * num_row,
1257+
'metadata': [f'Large blob batch {11}'] * num_row,
1258+
'large_blob': [i.to_bytes(2, byteorder='little') + large_blob_data for i in range(num_row)]
1259+
}, schema=pa_schema)
1260+
write_builder = table.new_batch_write_builder()
1261+
writer = write_builder.new_write()
1262+
writer.write_arrow(expected)
1263+
1264+
commit_messages = writer.prepare_commit()
1265+
commit = write_builder.new_commit()
1266+
commit.commit(commit_messages)
1267+
writer.close()
1268+
1269+
# Read data back
1270+
read_builder = table.new_read_builder()
1271+
table_scan = read_builder.new_scan()
1272+
table_read = read_builder.new_read()
1273+
result = table_read.to_arrow(table_scan.plan().splits())
1274+
self.assertEqual(expected, result)
1275+
11661276
def test_blob_write_read_mixed_sizes_end_to_end(self):
11671277
"""Test end-to-end blob functionality with mixed blob sizes."""
11681278
from pypaimon import Schema
@@ -2106,6 +2216,72 @@ def test_blob_write_read_large_data_with_rolling_with_shard(self):
21062216
self.assertEqual(actual.column('id').to_pylist(), list(range(1, 161)), "ID column should match")
21072217
self.assertEqual(actual, expected)
21082218

2219+
def test_blob_large_data_volume_with_shard(self):
2220+
from pypaimon import Schema
2221+
2222+
# Create schema with blob column
2223+
pa_schema = pa.schema([
2224+
('id', pa.int32()),
2225+
('batch_id', pa.int32()),
2226+
('metadata', pa.string()),
2227+
('large_blob', pa.large_binary()),
2228+
])
2229+
2230+
schema = Schema.from_pyarrow_schema(
2231+
pa_schema,
2232+
options={
2233+
'row-tracking.enabled': 'true',
2234+
'data-evolution.enabled': 'true'
2235+
}
2236+
)
2237+
self.catalog.create_table('test_db.blob_large_data_volume_with_shard', schema, False)
2238+
table = self.catalog.get_table('test_db.blob_large_data_volume_with_shard')
2239+
2240+
large_blob_size = 5 * 1024
2241+
blob_pattern = b'LARGE_BLOB_PATTERN_' + b'X' * 1024 # ~1KB pattern
2242+
pattern_size = len(blob_pattern)
2243+
repetitions = large_blob_size // pattern_size
2244+
large_blob_data = blob_pattern * repetitions
2245+
2246+
num_row = 20000
2247+
write_builder = table.new_batch_write_builder()
2248+
writer = write_builder.new_write()
2249+
expected = pa.Table.from_pydict({
2250+
'id': [1] * num_row,
2251+
'batch_id': [11] * num_row,
2252+
'metadata': [f'Large blob batch {11}'] * num_row,
2253+
'large_blob': [i.to_bytes(2, byteorder='little') + large_blob_data for i in range(num_row)]
2254+
}, schema=pa_schema)
2255+
writer.write_arrow(expected)
2256+
2257+
# Commit all data at once
2258+
commit_messages = writer.prepare_commit()
2259+
commit = write_builder.new_commit()
2260+
commit.commit(commit_messages)
2261+
writer.close()
2262+
2263+
# Read data back
2264+
read_builder = table.new_read_builder()
2265+
table_scan = read_builder.new_scan().with_shard(2, 3)
2266+
table_read = read_builder.new_read()
2267+
result = table_read.to_arrow(table_scan.plan().splits())
2268+
2269+
# Verify the data
2270+
self.assertEqual(6666, result.num_rows)
2271+
self.assertEqual(4, result.num_columns)
2272+
2273+
self.assertEqual(expected.slice(13334, 6666), result)
2274+
splits = read_builder.new_scan().plan().splits()
2275+
expected = table_read.to_arrow(splits)
2276+
splits1 = read_builder.new_scan().with_shard(0, 3).plan().splits()
2277+
actual1 = table_read.to_arrow(splits1)
2278+
splits2 = read_builder.new_scan().with_shard(1, 3).plan().splits()
2279+
actual2 = table_read.to_arrow(splits2)
2280+
splits3 = read_builder.new_scan().with_shard(2, 3).plan().splits()
2281+
actual3 = table_read.to_arrow(splits3)
2282+
actual = pa.concat_tables([actual1, actual2, actual3]).sort_by('id')
2283+
self.assertEqual(actual, expected)
2284+
21092285
def test_data_blob_writer_with_shard(self):
21102286
"""Test DataBlobWriter with mixed data types in blob column."""
21112287
from pypaimon import Schema
@@ -2168,6 +2344,73 @@ def test_data_blob_writer_with_shard(self):
21682344
self.assertEqual(result.num_rows, 2, "Should have 2 rows")
21692345
self.assertEqual(result.num_columns, 3, "Should have 3 columns")
21702346

2347+
def test_blob_write_read_large_data_volume_rolling_with_shard(self):
2348+
from pypaimon import Schema
2349+
2350+
# Create schema with blob column
2351+
pa_schema = pa.schema([
2352+
('id', pa.int32()),
2353+
('batch_id', pa.int32()),
2354+
('metadata', pa.string()),
2355+
('large_blob', pa.large_binary()),
2356+
])
2357+
2358+
schema = Schema.from_pyarrow_schema(
2359+
pa_schema,
2360+
options={
2361+
'row-tracking.enabled': 'true',
2362+
'data-evolution.enabled': 'true',
2363+
'blob.target-file-size': '10MB'
2364+
}
2365+
)
2366+
self.catalog.create_table('test_db.test_blob_write_read_large_data_volume_rolling_with_shard', schema, False)
2367+
table = self.catalog.get_table('test_db.test_blob_write_read_large_data_volume_rolling_with_shard')
2368+
2369+
# Create large blob data
2370+
large_blob_size = 5 * 1024 #
2371+
blob_pattern = b'LARGE_BLOB_PATTERN_' + b'X' * 1024 # ~1KB pattern
2372+
pattern_size = len(blob_pattern)
2373+
repetitions = large_blob_size // pattern_size
2374+
large_blob_data = blob_pattern * repetitions
2375+
2376+
actual_size = len(large_blob_data)
2377+
print(f"Created blob data: {actual_size:,} bytes ({actual_size / (1024 * 1024):.2f} MB)")
2378+
# Write 40 batches of data
2379+
num_row = 10000
2380+
expected = pa.Table.from_pydict({
2381+
'id': [1] * num_row,
2382+
'batch_id': [11] * num_row,
2383+
'metadata': [f'Large blob batch {11}'] * num_row,
2384+
'large_blob': [i.to_bytes(2, byteorder='little') + large_blob_data for i in range(num_row)]
2385+
}, schema=pa_schema)
2386+
write_builder = table.new_batch_write_builder()
2387+
writer = write_builder.new_write()
2388+
writer.write_arrow(expected)
2389+
2390+
commit_messages = writer.prepare_commit()
2391+
commit = write_builder.new_commit()
2392+
commit.commit(commit_messages)
2393+
writer.close()
2394+
2395+
# Read data back
2396+
read_builder = table.new_read_builder()
2397+
table_scan = read_builder.new_scan()
2398+
table_read = read_builder.new_read()
2399+
result = table_read.to_arrow(table_scan.plan().splits())
2400+
self.assertEqual(expected, result)
2401+
2402+
splits = read_builder.new_scan().plan().splits()
2403+
expected = table_read.to_arrow(splits)
2404+
splits1 = read_builder.new_scan().with_shard(0, 3).plan().splits()
2405+
actual1 = table_read.to_arrow(splits1)
2406+
splits2 = read_builder.new_scan().with_shard(1, 3).plan().splits()
2407+
actual2 = table_read.to_arrow(splits2)
2408+
splits3 = read_builder.new_scan().with_shard(2, 3).plan().splits()
2409+
actual3 = table_read.to_arrow(splits3)
2410+
actual = pa.concat_tables([actual1, actual2, actual3]).sort_by('id')
2411+
2412+
self.assertEqual(actual, expected)
2413+
21712414

21722415
if __name__ == '__main__':
21732416
unittest.main()

0 commit comments

Comments
 (0)