From 13fd23b20fb819be3a31fcdf19d743189319c44a Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Thu, 12 Mar 2026 17:46:38 +0100 Subject: [PATCH 1/3] feat: low-memory ingestion using BufferedStream MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace BytesIO with BufferedStream (SpooledTemporaryFile, 5MB threshold) throughout the fetch and store pipeline to avoid loading large files fully into memory. - Add BufferedStream to utils: stays in memory up to 5MB, spills to disk - Stream HTTP response body via iter_content(1MB chunks) into BufferedStream, hashing on the fly — no more response.content loading full body into memory - Add http_decompress=True option to retrieve_http: stream-decompresses gzip content (e.g. .json.gz from S3) without double-compressing on store - Use BufferedStream in _prepare_write_stream and _prepare_read_stream for compress/decompress in the store - DraftFile.stream typed as BufferedStream with coercing validator for backwards compatibility (accepts BytesIO, bytes, or any readable) --- ingestify/application/dataset_store.py | 11 ++-- ingestify/domain/models/dataset/file.py | 38 ++++++++----- ingestify/infra/fetch/http.py | 74 ++++++++++++++++--------- ingestify/tests/test_http_fetch.py | 69 +++++++++++++++++++++++ ingestify/utils.py | 26 ++++++++- 5 files changed, 172 insertions(+), 46 deletions(-) create mode 100644 ingestify/tests/test_http_fetch.py diff --git a/ingestify/application/dataset_store.py b/ingestify/application/dataset_store.py index 66a8fdf..7e9cf66 100644 --- a/ingestify/application/dataset_store.py +++ b/ingestify/application/dataset_store.py @@ -5,6 +5,7 @@ from contextlib import contextmanager import threading from io import BytesIO +from ingestify.utils import BufferedStream from typing import ( Dict, @@ -298,9 +299,9 @@ def iter_dataset_collection_batches( # dataset = self.dataset_repository. # self.dataset_repository.destroy_dataset(dataset_id) - def _prepare_write_stream(self, file_: DraftFile) -> tuple[BytesIO, int, str]: + def _prepare_write_stream(self, file_: DraftFile) -> tuple[BinaryIO, int, str]: if self.storage_compression_method == "gzip": - stream = BytesIO() + stream = BufferedStream() with gzip.GzipFile(fileobj=stream, compresslevel=9, mode="wb") as fp: shutil.copyfileobj(file_.stream, fp) @@ -317,11 +318,11 @@ def _prepare_write_stream(self, file_: DraftFile) -> tuple[BytesIO, int, str]: def _prepare_read_stream( self, - ) -> tuple[Callable[[BinaryIO], Awaitable[BytesIO]], str]: + ) -> tuple[Callable[[BinaryIO], Awaitable[BinaryIO]], str]: if self.storage_compression_method == "gzip": - def reader(fh: BinaryIO) -> BytesIO: - stream = BytesIO() + def reader(fh: BinaryIO) -> BinaryIO: + stream = BufferedStream() with gzip.GzipFile(fileobj=fh, compresslevel=9, mode="rb") as fp: shutil.copyfileobj(fp, stream) stream.seek(0) diff --git a/ingestify/domain/models/dataset/file.py b/ingestify/domain/models/dataset/file.py index 6dfa3d4..9c6a857 100644 --- a/ingestify/domain/models/dataset/file.py +++ b/ingestify/domain/models/dataset/file.py @@ -4,8 +4,10 @@ from io import BytesIO, StringIO import hashlib +from pydantic import field_validator + from ingestify.domain.models.base import BaseModel -from ingestify.utils import utcnow +from ingestify.utils import utcnow, BufferedStream class DraftFile(BaseModel): @@ -17,7 +19,19 @@ class DraftFile(BaseModel): data_feed_key: str # Example: 'events' data_spec_version: str # Example: 'v3' data_serialization_format: str # Example: 'json' - stream: BytesIO + stream: BufferedStream + + @field_validator("stream", mode="before") + @classmethod + def coerce_to_buffered_stream(cls, v): + if isinstance(v, BufferedStream): + return v + if isinstance(v, (BytesIO, bytes)): + data = v if isinstance(v, bytes) else v.getvalue() + return BufferedStream.from_stream(BytesIO(data)) + if hasattr(v, "read"): + return BufferedStream.from_stream(v) + raise ValueError(f"Cannot coerce {type(v)} to BufferedStream") @classmethod def from_input( @@ -32,26 +46,20 @@ def from_input( if isinstance(file_, (DraftFile, NotModifiedFile)): return file_ elif isinstance(file_, str): - stream = BytesIO(file_.encode("utf-8")) + data = file_.encode("utf-8") elif isinstance(file_, bytes): - stream = BytesIO(file_) + data = file_ elif isinstance(file_, StringIO): - stream = BytesIO(file_.read().encode("utf-8")) - elif isinstance(file_, BytesIO): - stream = file_ + data = file_.read().encode("utf-8") elif hasattr(file_, "read"): - data = file_.read() - if isinstance(data, bytes): - stream = BytesIO(data) - else: - stream = BytesIO(data.encode("utf-8")) + raw = file_.read() + data = raw if isinstance(raw, bytes) else raw.encode("utf-8") else: raise Exception(f"Not possible to create DraftFile from {type(file_)}") - data = stream.read() size = len(data) tag = hashlib.sha1(data).hexdigest() - stream.seek(0) + stream = BufferedStream.from_stream(BytesIO(data)) now = utcnow() @@ -127,7 +135,7 @@ class LoadedFile(BaseModel): data_serialization_format: Optional[str] # Example: 'json' storage_compression_method: Optional[str] # Example: 'gzip' storage_path: Path - stream_: Union[BinaryIO, BytesIO, Callable[[], Awaitable[Union[BinaryIO, BytesIO]]]] + stream_: Union[BinaryIO, BytesIO, BufferedStream, Callable[[], Awaitable[Union[BinaryIO, BytesIO, BufferedStream]]]] revision_id: Optional[int] = None # This can be used when a Revision is squashed def load_stream(self): diff --git a/ingestify/infra/fetch/http.py b/ingestify/infra/fetch/http.py index 45bb651..bb51ddd 100644 --- a/ingestify/infra/fetch/http.py +++ b/ingestify/infra/fetch/http.py @@ -1,9 +1,11 @@ +import gzip import json +import shutil from datetime import datetime from email.utils import format_datetime, parsedate from hashlib import sha1 from io import BytesIO -from typing import Optional, Callable, Tuple, Union +from typing import BinaryIO, Optional, Callable, Tuple, Union import requests from requests.adapters import HTTPAdapter @@ -11,7 +13,7 @@ from ingestify.domain.models import DraftFile, File from ingestify.domain.models.dataset.file import NotModifiedFile -from ingestify.utils import utcnow +from ingestify.utils import utcnow, BufferedStream _session = None @@ -40,6 +42,17 @@ def get_session(): return _session +def _decompress(source: BinaryIO) -> tuple[BufferedStream, int]: + """Stream-decompress gzip content into a BufferedStream, returning (stream, uncompressed_size).""" + stream = BufferedStream() + with gzip.GzipFile(fileobj=source, mode="rb") as gz: + shutil.copyfileobj(gz, stream) + stream.seek(0, 2) + size = stream.tell() + stream.seek(0) + return stream, size + + def retrieve_http( url, current_file: Optional[File] = None, @@ -74,8 +87,9 @@ def retrieve_http( raise Exception(f"Don't know how to use {key}") ignore_not_found = http_kwargs.pop("ignore_not_found", False) + decompress = http_kwargs.pop("decompress", False) - response = get_session().get(url, headers=headers, **http_kwargs) + response = get_session().get(url, headers=headers, stream=True, **http_kwargs) if response.status_code == 404 and ignore_not_found: return NotModifiedFile( modified_at=last_modified, reason="404 http code and ignore-not-found" @@ -96,12 +110,9 @@ def retrieve_http( modified_at = utcnow() tag = response.headers.get("etag") - # content_length = int(response.headers.get("content-length", 0)) if pager: - """ - A pager helps with responses that return the data in pages. - """ + # Pager assembles multiple small JSON responses — load fully into memory data_path, pager_fn = pager data = [] while True: @@ -111,24 +122,37 @@ def retrieve_http( if not next_url: break else: - response = requests.get(next_url, headers=headers, **http_kwargs) - - content = json.dumps({data_path: data}).encode("utf-8") + response = requests.get(next_url, headers=headers, stream=True, **http_kwargs) + + content_bytes = json.dumps({data_path: data}).encode("utf-8") + if not tag: + tag = sha1(content_bytes).hexdigest() + if current_file and current_file.tag == tag: + return NotModifiedFile(modified_at=last_modified, reason="tag matched current_file") + stream = BufferedStream.from_stream(BytesIO(content_bytes)) + content_length = len(content_bytes) else: - content = response.content - - if not tag: - tag = sha1(content).hexdigest() - - # if not content_length: - Don't use http header as it might be wrong - # for example in case of compressed data - content_length = len(content) - - if current_file and current_file.tag == tag: - # Not changed. Don't keep it - return NotModifiedFile( - modified_at=last_modified, reason="tag matched current_file" - ) + # Stream response body directly into BufferedStream, hashing on the fly + raw_stream = BufferedStream() + hasher = sha1() + for chunk in response.iter_content(chunk_size=1024 * 1024): + hasher.update(chunk) + raw_stream.write(chunk) + + if not tag: + tag = hasher.hexdigest() + + if current_file and current_file.tag == tag: + return NotModifiedFile(modified_at=last_modified, reason="tag matched current_file") + + raw_stream.seek(0) + if decompress: + stream, content_length = _decompress(raw_stream) + else: + raw_stream.seek(0, 2) + content_length = raw_stream.tell() + raw_stream.seek(0) + stream = raw_stream return DraftFile( created_at=utcnow(), @@ -136,6 +160,6 @@ def retrieve_http( tag=tag, size=content_length, content_type=response.headers.get("content-type"), - stream=BytesIO(content), + stream=stream, **file_attributes, ) diff --git a/ingestify/tests/test_http_fetch.py b/ingestify/tests/test_http_fetch.py new file mode 100644 index 0000000..b02c000 --- /dev/null +++ b/ingestify/tests/test_http_fetch.py @@ -0,0 +1,69 @@ +import gzip +from unittest.mock import MagicMock, patch + +import pytest + +from ingestify.infra.fetch.http import retrieve_http +from ingestify.utils import BufferedStream + + +def make_mock_response(content, status_code=200, headers=None): + headers = headers or {} + mock = MagicMock() + mock.status_code = status_code + mock.headers = MagicMock() + mock.headers.get = lambda key, default=None: headers.get(key, default) + mock.headers.__contains__ = lambda self, key: key in headers + mock.raise_for_status = MagicMock() + mock.iter_content = lambda chunk_size=1: [content] + return mock + + +FILE_KWARGS = dict( + file_data_feed_key="test", + file_data_spec_version="v1", + file_data_serialization_format="json", +) + +PLAIN_JSON = b'{"key": "value"}' * 100 + + +def test_decompress_false_stores_gzip_as_is(): + compressed = gzip.compress(PLAIN_JSON) + + with patch("ingestify.infra.fetch.http.get_session") as mock_session: + mock_session.return_value.get.return_value = make_mock_response(compressed) + result = retrieve_http("https://example.com/data.json.gz", **FILE_KWARGS) + + assert isinstance(result.stream, BufferedStream) + assert result.size == len(compressed) + assert result.stream.read() == compressed + + +def test_decompress_true_decompresses_and_sets_correct_size(): + compressed = gzip.compress(PLAIN_JSON) + + with patch("ingestify.infra.fetch.http.get_session") as mock_session: + mock_session.return_value.get.return_value = make_mock_response(compressed) + result = retrieve_http( + "https://s3.amazonaws.com/bucket/data.json.gz?X-Amz-Signature=abc123", + http_decompress=True, + **FILE_KWARGS, + ) + + assert isinstance(result.stream, BufferedStream) + assert result.size == len(PLAIN_JSON) + assert result.stream.read() == PLAIN_JSON + + +def test_decompress_true_corrupt_gzip_raises(): + corrupt = b"\x1f\x8b" + b"\x00" * 20 + + with patch("ingestify.infra.fetch.http.get_session") as mock_session: + mock_session.return_value.get.return_value = make_mock_response(corrupt) + with pytest.raises(Exception): + retrieve_http( + "https://example.com/data.json.gz", + http_decompress=True, + **FILE_KWARGS, + ) diff --git a/ingestify/utils.py b/ingestify/utils.py index 4d40b84..1ae146c 100644 --- a/ingestify/utils.py +++ b/ingestify/utils.py @@ -1,5 +1,7 @@ import logging import os +import shutil +import tempfile import time import re import traceback @@ -8,7 +10,7 @@ from datetime import datetime, timezone from string import Template -from typing import Dict, Tuple, Optional, Any, List +from typing import BinaryIO, Dict, Tuple, Optional, Any, List from pydantic import Field from typing_extensions import Self @@ -20,6 +22,28 @@ logger = logging.getLogger(__name__) +_DEFAULT_BUFFER_SIZE = 5 * 1024 * 1024 # 5MB before spilling to disk + + +class BufferedStream(tempfile.SpooledTemporaryFile): + """Stays in memory up to max_size, then spills to disk. Drop-in for BytesIO for large streams.""" + + def __init__(self, max_size: int = _DEFAULT_BUFFER_SIZE): + super().__init__(max_size=max_size, mode="w+b") + + def write(self, data: bytes) -> int: + return super().write(data) + + def read(self, n: int = -1) -> bytes: + return super().read(n) + + @classmethod + def from_stream(cls, source: BinaryIO, max_size: int = _DEFAULT_BUFFER_SIZE) -> "BufferedStream": + buffer = cls(max_size=max_size) + shutil.copyfileobj(source, buffer) + buffer.seek(0) + return buffer + def chunker(it, size): iterator = iter(it) From e69b9a7d0e9a62da3805deea6e18f6b89f80e7f6 Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Thu, 12 Mar 2026 19:15:25 +0100 Subject: [PATCH 2/3] feat: low-memory ingestion using BufferedStream MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace BytesIO with BufferedStream (SpooledTemporaryFile, 5MB threshold) throughout the fetch and store pipeline to avoid loading large files into memory. - Add BufferedStream to utils: stays in memory up to 5MB, spills to disk - Stream HTTP response via iter_content(1MB chunks) into BufferedStream, hashing on the fly — no more response.content loading full body into memory - Detect gzip content (magic bytes) once in retrieve_http, store as DraftFile.content_compression_method — no re-reading the stream later - Gzip files are stored as-is (no recompression CPU cost); size is read from the gzip trailer so file.size always reflects uncompressed data size - _prepare_write_stream uses content_compression_method to skip compression for already-compressed files, and returns the actual compression_method used so File metadata is always correct - DraftFile.stream typed as BufferedStream with coercing validator for backwards compatibility (accepts BytesIO, bytes, or any readable) --- ingestify/application/dataset_store.py | 22 +++++++++------ ingestify/domain/models/dataset/file.py | 1 + ingestify/infra/fetch/http.py | 27 +++++++++--------- ingestify/tests/test_http_fetch.py | 37 ++++++------------------- 4 files changed, 36 insertions(+), 51 deletions(-) diff --git a/ingestify/application/dataset_store.py b/ingestify/application/dataset_store.py index 7e9cf66..10c33aa 100644 --- a/ingestify/application/dataset_store.py +++ b/ingestify/application/dataset_store.py @@ -299,7 +299,15 @@ def iter_dataset_collection_batches( # dataset = self.dataset_repository. # self.dataset_repository.destroy_dataset(dataset_id) - def _prepare_write_stream(self, file_: DraftFile) -> tuple[BinaryIO, int, str]: + def _prepare_write_stream(self, file_: DraftFile) -> tuple[BinaryIO, int, str, Optional[str]]: + if file_.content_compression_method == "gzip": + # Already gzip - store as-is, no CPU cost + stream = file_.stream + stream.seek(0, os.SEEK_END) + storage_size = stream.tell() + stream.seek(0) + return stream, storage_size, ".gz", "gzip" + if self.storage_compression_method == "gzip": stream = BufferedStream() with gzip.GzipFile(fileobj=stream, compresslevel=9, mode="wb") as fp: @@ -308,13 +316,9 @@ def _prepare_write_stream(self, file_: DraftFile) -> tuple[BinaryIO, int, str]: stream.seek(0, os.SEEK_END) storage_size = stream.tell() stream.seek(0) - suffix = ".gz" - else: - stream = file_.stream - storage_size = file_.size - suffix = "" + return stream, storage_size, ".gz", "gzip" - return stream, storage_size, suffix + return file_.stream, file_.size, "", None def _prepare_read_stream( self, @@ -356,7 +360,7 @@ def _persist_files( # File didn't change. Ignore it. continue - stream, storage_size, suffix = self._prepare_write_stream(file_) + stream, storage_size, suffix, compression_method = self._prepare_write_stream(file_) # TODO: check if this is a very clean way to go from DraftFile to File full_path = self.file_repository.save_content( @@ -370,7 +374,7 @@ def _persist_files( file_, file_id, storage_size=storage_size, - storage_compression_method=self.storage_compression_method, + storage_compression_method=compression_method, path=self.file_repository.get_relative_path(full_path), ) diff --git a/ingestify/domain/models/dataset/file.py b/ingestify/domain/models/dataset/file.py index 9c6a857..f893b20 100644 --- a/ingestify/domain/models/dataset/file.py +++ b/ingestify/domain/models/dataset/file.py @@ -19,6 +19,7 @@ class DraftFile(BaseModel): data_feed_key: str # Example: 'events' data_spec_version: str # Example: 'v3' data_serialization_format: str # Example: 'json' + content_compression_method: Optional[str] = None # Example: 'gzip' stream: BufferedStream @field_validator("stream", mode="before") diff --git a/ingestify/infra/fetch/http.py b/ingestify/infra/fetch/http.py index bb51ddd..37a8164 100644 --- a/ingestify/infra/fetch/http.py +++ b/ingestify/infra/fetch/http.py @@ -1,6 +1,4 @@ -import gzip import json -import shutil from datetime import datetime from email.utils import format_datetime, parsedate from hashlib import sha1 @@ -42,15 +40,12 @@ def get_session(): return _session -def _decompress(source: BinaryIO) -> tuple[BufferedStream, int]: - """Stream-decompress gzip content into a BufferedStream, returning (stream, uncompressed_size).""" - stream = BufferedStream() - with gzip.GzipFile(fileobj=source, mode="rb") as gz: - shutil.copyfileobj(gz, stream) - stream.seek(0, 2) - size = stream.tell() +def _uncompressed_size_from_gzip_trailer(stream: BinaryIO) -> int: + """Read uncompressed size from the gzip trailer (last 4 bytes, mod 2^32).""" + stream.seek(-4, 2) + size = int.from_bytes(stream.read(4), "little") stream.seek(0) - return stream, size + return size def retrieve_http( @@ -87,7 +82,6 @@ def retrieve_http( raise Exception(f"Don't know how to use {key}") ignore_not_found = http_kwargs.pop("ignore_not_found", False) - decompress = http_kwargs.pop("decompress", False) response = get_session().get(url, headers=headers, stream=True, **http_kwargs) if response.status_code == 404 and ignore_not_found: @@ -146,13 +140,17 @@ def retrieve_http( return NotModifiedFile(modified_at=last_modified, reason="tag matched current_file") raw_stream.seek(0) - if decompress: - stream, content_length = _decompress(raw_stream) + header = raw_stream.read(2) + raw_stream.seek(0) + if header == b"\x1f\x8b": + content_compression_method = "gzip" + content_length = _uncompressed_size_from_gzip_trailer(raw_stream) else: + content_compression_method = None raw_stream.seek(0, 2) content_length = raw_stream.tell() raw_stream.seek(0) - stream = raw_stream + stream = raw_stream return DraftFile( created_at=utcnow(), @@ -160,6 +158,7 @@ def retrieve_http( tag=tag, size=content_length, content_type=response.headers.get("content-type"), + content_compression_method=content_compression_method, stream=stream, **file_attributes, ) diff --git a/ingestify/tests/test_http_fetch.py b/ingestify/tests/test_http_fetch.py index b02c000..f20f84f 100644 --- a/ingestify/tests/test_http_fetch.py +++ b/ingestify/tests/test_http_fetch.py @@ -28,42 +28,23 @@ def make_mock_response(content, status_code=200, headers=None): PLAIN_JSON = b'{"key": "value"}' * 100 -def test_decompress_false_stores_gzip_as_is(): - compressed = gzip.compress(PLAIN_JSON) - +def test_plain_content_size_and_stream(): with patch("ingestify.infra.fetch.http.get_session") as mock_session: - mock_session.return_value.get.return_value = make_mock_response(compressed) - result = retrieve_http("https://example.com/data.json.gz", **FILE_KWARGS) + mock_session.return_value.get.return_value = make_mock_response(PLAIN_JSON) + result = retrieve_http("https://example.com/data.json", **FILE_KWARGS) assert isinstance(result.stream, BufferedStream) - assert result.size == len(compressed) - assert result.stream.read() == compressed + assert result.size == len(PLAIN_JSON) + assert result.stream.read() == PLAIN_JSON -def test_decompress_true_decompresses_and_sets_correct_size(): +def test_gzip_content_stored_as_is_with_uncompressed_size(): compressed = gzip.compress(PLAIN_JSON) with patch("ingestify.infra.fetch.http.get_session") as mock_session: mock_session.return_value.get.return_value = make_mock_response(compressed) - result = retrieve_http( - "https://s3.amazonaws.com/bucket/data.json.gz?X-Amz-Signature=abc123", - http_decompress=True, - **FILE_KWARGS, - ) + result = retrieve_http("https://example.com/data.json.gz", **FILE_KWARGS) assert isinstance(result.stream, BufferedStream) - assert result.size == len(PLAIN_JSON) - assert result.stream.read() == PLAIN_JSON - - -def test_decompress_true_corrupt_gzip_raises(): - corrupt = b"\x1f\x8b" + b"\x00" * 20 - - with patch("ingestify.infra.fetch.http.get_session") as mock_session: - mock_session.return_value.get.return_value = make_mock_response(corrupt) - with pytest.raises(Exception): - retrieve_http( - "https://example.com/data.json.gz", - http_decompress=True, - **FILE_KWARGS, - ) + assert result.size == len(PLAIN_JSON) # uncompressed size from gzip trailer + assert result.stream.read() == compressed # stored as-is From b421abb549a3f0020d2b283e770836b18e9a9227 Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Tue, 17 Mar 2026 11:41:23 +0100 Subject: [PATCH 3/3] refactor: detect_compression and gzip_uncompressed_size utilities - Extract detect_compression() and gzip_uncompressed_size() into utils - Set DraftFile.content_compression_method once on fetch, used by store - Add tests for detect_compression, gzip_uncompressed_size, and http fetch - Format with black --- ingestify/application/dataset_store.py | 11 ++++++-- ingestify/domain/models/dataset/file.py | 7 ++++- ingestify/infra/fetch/http.py | 36 ++++++++++++------------- ingestify/tests/test_http_fetch.py | 11 +++++++- ingestify/tests/test_utils.py | 29 ++++++++++++++++++++ ingestify/utils.py | 21 ++++++++++++++- 6 files changed, 92 insertions(+), 23 deletions(-) create mode 100644 ingestify/tests/test_utils.py diff --git a/ingestify/application/dataset_store.py b/ingestify/application/dataset_store.py index 10c33aa..ffc6b46 100644 --- a/ingestify/application/dataset_store.py +++ b/ingestify/application/dataset_store.py @@ -299,7 +299,9 @@ def iter_dataset_collection_batches( # dataset = self.dataset_repository. # self.dataset_repository.destroy_dataset(dataset_id) - def _prepare_write_stream(self, file_: DraftFile) -> tuple[BinaryIO, int, str, Optional[str]]: + def _prepare_write_stream( + self, file_: DraftFile + ) -> tuple[BinaryIO, int, str, Optional[str]]: if file_.content_compression_method == "gzip": # Already gzip - store as-is, no CPU cost stream = file_.stream @@ -360,7 +362,12 @@ def _persist_files( # File didn't change. Ignore it. continue - stream, storage_size, suffix, compression_method = self._prepare_write_stream(file_) + ( + stream, + storage_size, + suffix, + compression_method, + ) = self._prepare_write_stream(file_) # TODO: check if this is a very clean way to go from DraftFile to File full_path = self.file_repository.save_content( diff --git a/ingestify/domain/models/dataset/file.py b/ingestify/domain/models/dataset/file.py index f893b20..e2e32f8 100644 --- a/ingestify/domain/models/dataset/file.py +++ b/ingestify/domain/models/dataset/file.py @@ -136,7 +136,12 @@ class LoadedFile(BaseModel): data_serialization_format: Optional[str] # Example: 'json' storage_compression_method: Optional[str] # Example: 'gzip' storage_path: Path - stream_: Union[BinaryIO, BytesIO, BufferedStream, Callable[[], Awaitable[Union[BinaryIO, BytesIO, BufferedStream]]]] + stream_: Union[ + BinaryIO, + BytesIO, + BufferedStream, + Callable[[], Awaitable[Union[BinaryIO, BytesIO, BufferedStream]]], + ] revision_id: Optional[int] = None # This can be used when a Revision is squashed def load_stream(self): diff --git a/ingestify/infra/fetch/http.py b/ingestify/infra/fetch/http.py index 37a8164..0ef3493 100644 --- a/ingestify/infra/fetch/http.py +++ b/ingestify/infra/fetch/http.py @@ -11,7 +11,12 @@ from ingestify.domain.models import DraftFile, File from ingestify.domain.models.dataset.file import NotModifiedFile -from ingestify.utils import utcnow, BufferedStream +from ingestify.utils import ( + utcnow, + BufferedStream, + detect_compression, + gzip_uncompressed_size, +) _session = None @@ -40,14 +45,6 @@ def get_session(): return _session -def _uncompressed_size_from_gzip_trailer(stream: BinaryIO) -> int: - """Read uncompressed size from the gzip trailer (last 4 bytes, mod 2^32).""" - stream.seek(-4, 2) - size = int.from_bytes(stream.read(4), "little") - stream.seek(0) - return size - - def retrieve_http( url, current_file: Optional[File] = None, @@ -116,13 +113,17 @@ def retrieve_http( if not next_url: break else: - response = requests.get(next_url, headers=headers, stream=True, **http_kwargs) + response = requests.get( + next_url, headers=headers, stream=True, **http_kwargs + ) content_bytes = json.dumps({data_path: data}).encode("utf-8") if not tag: tag = sha1(content_bytes).hexdigest() if current_file and current_file.tag == tag: - return NotModifiedFile(modified_at=last_modified, reason="tag matched current_file") + return NotModifiedFile( + modified_at=last_modified, reason="tag matched current_file" + ) stream = BufferedStream.from_stream(BytesIO(content_bytes)) content_length = len(content_bytes) else: @@ -137,16 +138,15 @@ def retrieve_http( tag = hasher.hexdigest() if current_file and current_file.tag == tag: - return NotModifiedFile(modified_at=last_modified, reason="tag matched current_file") + return NotModifiedFile( + modified_at=last_modified, reason="tag matched current_file" + ) raw_stream.seek(0) - header = raw_stream.read(2) - raw_stream.seek(0) - if header == b"\x1f\x8b": - content_compression_method = "gzip" - content_length = _uncompressed_size_from_gzip_trailer(raw_stream) + content_compression_method = detect_compression(raw_stream) + if content_compression_method == "gzip": + content_length = gzip_uncompressed_size(raw_stream) else: - content_compression_method = None raw_stream.seek(0, 2) content_length = raw_stream.tell() raw_stream.seek(0) diff --git a/ingestify/tests/test_http_fetch.py b/ingestify/tests/test_http_fetch.py index f20f84f..cdafafc 100644 --- a/ingestify/tests/test_http_fetch.py +++ b/ingestify/tests/test_http_fetch.py @@ -46,5 +46,14 @@ def test_gzip_content_stored_as_is_with_uncompressed_size(): result = retrieve_http("https://example.com/data.json.gz", **FILE_KWARGS) assert isinstance(result.stream, BufferedStream) - assert result.size == len(PLAIN_JSON) # uncompressed size from gzip trailer + assert result.content_compression_method == "gzip" + assert result.size == len(PLAIN_JSON) # uncompressed size from gzip trailer assert result.stream.read() == compressed # stored as-is + + +def test_plain_content_has_no_compression_method(): + with patch("ingestify.infra.fetch.http.get_session") as mock_session: + mock_session.return_value.get.return_value = make_mock_response(PLAIN_JSON) + result = retrieve_http("https://example.com/data.json", **FILE_KWARGS) + + assert result.content_compression_method is None diff --git a/ingestify/tests/test_utils.py b/ingestify/tests/test_utils.py new file mode 100644 index 0000000..6007d75 --- /dev/null +++ b/ingestify/tests/test_utils.py @@ -0,0 +1,29 @@ +import gzip +from io import BytesIO + +from ingestify.utils import BufferedStream, detect_compression, gzip_uncompressed_size + +PLAIN = b'{"key": "value"}' * 100 + + +def to_stream(data: bytes) -> BufferedStream: + return BufferedStream.from_stream(BytesIO(data)) + + +def test_detect_compression_gzip(): + assert detect_compression(to_stream(gzip.compress(PLAIN))) == "gzip" + + +def test_detect_compression_plain(): + assert detect_compression(to_stream(PLAIN)) is None + + +def test_detect_compression_resets_position(): + stream = to_stream(gzip.compress(PLAIN)) + detect_compression(stream) + assert stream.tell() == 0 + + +def test_gzip_uncompressed_size(): + compressed = gzip.compress(PLAIN) + assert gzip_uncompressed_size(to_stream(compressed)) == len(PLAIN) diff --git a/ingestify/utils.py b/ingestify/utils.py index 1ae146c..75b04ab 100644 --- a/ingestify/utils.py +++ b/ingestify/utils.py @@ -38,13 +38,32 @@ def read(self, n: int = -1) -> bytes: return super().read(n) @classmethod - def from_stream(cls, source: BinaryIO, max_size: int = _DEFAULT_BUFFER_SIZE) -> "BufferedStream": + def from_stream( + cls, source: BinaryIO, max_size: int = _DEFAULT_BUFFER_SIZE + ) -> "BufferedStream": buffer = cls(max_size=max_size) shutil.copyfileobj(source, buffer) buffer.seek(0) return buffer +def gzip_uncompressed_size(stream: BinaryIO) -> int: + """Read uncompressed size from the gzip trailer (last 4 bytes, mod 2^32).""" + stream.seek(-4, 2) + size = int.from_bytes(stream.read(4), "little") + stream.seek(0) + return size + + +def detect_compression(stream: BinaryIO) -> Optional[str]: + """Detect compression method by reading magic bytes. Resets stream position afterwards.""" + header = stream.read(2) + stream.seek(0) + if header == b"\x1f\x8b": + return "gzip" + return None + + def chunker(it, size): iterator = iter(it) while chunk := list(islice(iterator, size)):