chore: parse in threadpool instead of on event loop#802
Conversation
There was a problem hiding this comment.
Pull Request Overview
This PR offloads document parsing to a thread pool to improve concurrency across vectorizers by making parsing methods asynchronous and running CPU-bound work off the event loop.
- Changed
parsing.parseinterface toasyncand awaited it in the embedding generator. - Introduced a global
ThreadPoolExecutorand updatedparse_docimplementations to userun_in_executor. - Deferred parsing imports and wrapped blocking logic in synchronous helper methods.
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| projects/pgai/pgai/vectorizer/vectorizer/vectorizer.py | Await the now-async parsing.parse call in _generate_embeddings. |
| projects/pgai/pgai/vectorizer/parsing.py | Converted parse methods to async, added _PARSING_EXECUTOR, and offloaded blocking work to threads. |
Comments suppressed due to low confidence (2)
projects/pgai/pgai/vectorizer/parsing.py:20
- Please add or update unit tests to cover the new async
parsemethods and validate that blocking logic is correctly executed in the thread pool.
async def parse(self, _1: dict[str, Any], payload: str | LoadedDocument) -> str:
projects/pgai/pgai/vectorizer/vectorizer/vectorizer.py:1191
- Since
parseis now async, confirm all other callers ofparsing.parsehave been updated toawaitthis method and update any related documentation to reflect this breaking change.
payload = await self.vectorizer.config.parsing.parse(item, payload)
| from pgai.vectorizer.loading import LoadedDocument | ||
|
|
||
| # Thread pool for CPU-intensive parsing operations | ||
| _PARSING_EXECUTOR = ThreadPoolExecutor(max_workers=4, thread_name_prefix="parsing") |
There was a problem hiding this comment.
[nitpick] Hardcoding max_workers=4 may not scale across environments—consider making this configurable or using os.cpu_count() to align with available cores.
| _PARSING_EXECUTOR = ThreadPoolExecutor(max_workers=4, thread_name_prefix="parsing") | |
| max_workers = int(os.getenv("PARSING_MAX_WORKERS", os.cpu_count() or 4)) | |
| _PARSING_EXECUTOR = ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="parsing") |
There was a problem hiding this comment.
That's actually a good suggestion. Good bot.
There was a problem hiding this comment.
The os.cpu_count doesn't make a lot of sense since python is single threaded. This just configures how many documents can be parsed in parallel. But I'll make it configurable.
There was a problem hiding this comment.
If the parsing is CPU bound why not use a ProcessPoolExecutor instead?
There was a problem hiding this comment.
NIT: python is not single threaded, it uses a lock to keep threads from executing in parallel. If you have IO bound tasks (reading files, network requests), the GIL is release so it gives concurrency benefits.
There was a problem hiding this comment.
Okay so more precisely python wont utilize more than 1 core to execute that parsing, I think the argument still stands. The cloud lambda also runs only with one core so multiprocessing instead of threading is just additional overhead. I think.
There was a problem hiding this comment.
The cloud lambda is configured to have 2560MiB of RAM, which according to this S/O post should correspond to two cores.
5e54e3b to
a997ed6
Compare
|
|
||
| # Thread pool for CPU-intensive parsing operations | ||
| max_workers = int(os.getenv("PARSING_MAX_WORKERS", 4)) | ||
| _PARSING_EXECUTOR = ThreadPoolExecutor( |
There was a problem hiding this comment.
I mentioned here, but why not use a ProccessPoolExecutor?
35e80a5 to
f15011e
Compare
This doesn't make use of multiple CPU cores but it does allow to continue other vectorizers concurrently. If we push down e.g. processing time it would also allow us to stop somewhat gracefully (although you can't really kill a running future).