From b5af973891b9c97301f888b6e5cd4b077d7a456a Mon Sep 17 00:00:00 2001 From: obchain Date: Thu, 21 May 2026 17:32:21 +0530 Subject: [PATCH 1/3] fix(memory/ingestion): bound the job channel + reject submits at cap Producers can DoS the core today by calling `put_doc` or `store_skill_sync` faster than the worker drains; the channel was `mpsc::unbounded_channel`, so a runaway loop would grow the in-flight buffer (each `IngestionJob` owns a full document body) until the process OOMs. Switch the channel to `mpsc::channel(DEFAULT_QUEUE_CAPACITY)` (512) and change `submit` from `tx.send()` to `tx.try_send()`. The two drop reasons (`Full`, `Closed`) are logged with distinct messages so observability can tell over-pressure apart from worker shutdown. Both paths roll `IngestionState::dequeue()` back so the `memory_ingestion_status` queue-depth gauge stays accurate under sustained overflow. `start_worker_with_capacity` is exposed (in addition to `start_worker_with_state`, which now delegates with the default cap) so unit tests can drive the at-capacity branch deterministically without faking a slow worker. Tests added in the same file: capacity enforcement, recovery after drain, channel-closed drop accounting, and a guardrail on `DEFAULT_QUEUE_CAPACITY` so future bumps don't regress the memory-ceiling intent. Closes #2442 --- src/openhuman/memory/ingestion/queue.rs | 207 ++++++++++++++++++++++-- 1 file changed, 190 insertions(+), 17 deletions(-) diff --git a/src/openhuman/memory/ingestion/queue.rs b/src/openhuman/memory/ingestion/queue.rs index 28a8bd8ecb..3f4282d01e 100644 --- a/src/openhuman/memory/ingestion/queue.rs +++ b/src/openhuman/memory/ingestion/queue.rs @@ -4,8 +4,12 @@ //! dedicated worker thread. This ensures that `doc_put` callers never block //! on the heavier parsing and graph-write path. //! -//! The queue uses a `tokio::sync::mpsc` channel to decouple document submission -//! from the actual extraction process. +//! The queue uses a bounded `tokio::sync::mpsc` channel +//! ([`DEFAULT_QUEUE_CAPACITY`]) to decouple document submission from the +//! actual extraction process. Producers call [`IngestionQueue::submit`], +//! which is non-blocking; when the buffer is full the job is dropped with a +//! warn-level log so a runaway producer cannot grow the queue without bound +//! and exhaust process memory. use std::sync::Arc; use std::time::Instant; @@ -17,6 +21,22 @@ use super::MemoryIngestionConfig; use crate::core::event_bus::{publish_global, DomainEvent}; use crate::openhuman::memory::store::{NamespaceDocumentInput, UnifiedMemory}; +/// Default capacity of the ingestion job channel. +/// +/// Producers (`put_doc`, `store_skill_sync`) push jobs into this channel +/// without blocking; the worker drains them one-at-a-time under the +/// `IngestionState` singleton lock because the local extraction LLM cannot +/// run concurrently. A buggy or compromised producer can submit jobs much +/// faster than the worker drains them, so the channel must enforce an +/// explicit cap or the queue grows without bound and exhausts process +/// memory (each [`IngestionJob`] holds an owned document body). +/// +/// 512 is a deliberate middle ground: it absorbs reasonable bulk-import +/// bursts (e.g. backfilling a Notion workspace or a long Slack history) +/// without letting a runaway loop balloon RSS — at typical document sizes +/// of 1–100 KB the in-flight buffer caps below ~50 MB. +pub const DEFAULT_QUEUE_CAPACITY: usize = 512; + /// A job submitted to the ingestion worker. /// /// Contains all the necessary information to process a document for graph @@ -34,12 +54,15 @@ pub struct IngestionJob { /// Handle used by callers to submit ingestion jobs. /// -/// This is a thin wrapper around a `tokio::sync::mpsc::UnboundedSender` and -/// can be cloned freely to be shared across multiple producers. +/// This is a thin wrapper around a bounded `tokio::sync::mpsc::Sender` and +/// can be cloned freely to be shared across multiple producers. The bound +/// (see [`DEFAULT_QUEUE_CAPACITY`]) protects the core from runaway +/// producers; once the buffer is full, [`Self::submit`] returns `false` +/// instead of blocking or growing the queue. #[derive(Clone)] pub struct IngestionQueue { - /// Sender half of the job queue channel. - tx: mpsc::UnboundedSender, + /// Sender half of the bounded job queue channel. + tx: mpsc::Sender, /// Shared state — singleton lock, queue depth, status snapshot. state: IngestionState, } @@ -54,18 +77,39 @@ impl IngestionQueue { /// # Returns /// /// Returns `true` if the job was successfully enqueued, `false` if the - /// worker has shut down (e.g., during application termination) and the - /// job was dropped. + /// queue is full (capacity reached) or the worker has shut down (e.g., + /// during application termination). In both drop cases the job is not + /// persisted into the extraction pipeline — the underlying document + /// upsert that the caller already performed is unaffected. The queue + /// depth counter is restored before returning so the + /// `memory_ingestion_status` RPC stays accurate. pub fn submit(&self, job: IngestionJob) -> bool { self.state.enqueue(); - match self.tx.send(job) { + match self.tx.try_send(job) { Ok(()) => true, - Err(e) => { - // Worker is gone — undo the enqueue bump so depth stays accurate. + Err(mpsc::error::TrySendError::Full(dropped)) => { + // Channel is at capacity — log loudly so observability can + // surface the drop, then undo the enqueue bump so the queue + // depth gauge does not drift upward forever under sustained + // overflow. + self.state.dequeue(); + log::warn!( + "[memory:ingestion_queue] dropping job: queue at capacity ({} pending) namespace={} title={}", + self.tx.max_capacity(), + dropped.document.namespace, + dropped.document.title, + ); + false + } + Err(mpsc::error::TrySendError::Closed(dropped)) => { + // Worker is gone — same accounting as the full case, but a + // different reason worth distinguishing in logs because it + // means the entire pipeline is dead, not just over-pressure. self.state.dequeue(); log::warn!( - "[memory:ingestion_queue] failed to enqueue job (worker gone?): {}", - e.0.document.title, + "[memory:ingestion_queue] dropping job: worker channel closed (shutdown?) namespace={} title={}", + dropped.document.namespace, + dropped.document.title, ); false } @@ -98,16 +142,27 @@ pub fn start_worker(memory: Arc) -> IngestionQueue { /// Start a worker bound to a caller-supplied [`IngestionState`]. Useful when /// the synchronous ingest path needs to share the same singleton lock and -/// snapshot as the queue worker. +/// snapshot as the queue worker. Uses [`DEFAULT_QUEUE_CAPACITY`]. pub fn start_worker_with_state( memory: Arc, state: IngestionState, ) -> IngestionQueue { - let (tx, rx) = mpsc::unbounded_channel::(); + start_worker_with_capacity(memory, state, DEFAULT_QUEUE_CAPACITY) +} + +/// Start a worker with an explicit channel capacity. Exposed so unit tests +/// can drive the at-capacity drop path deterministically without faking a +/// slow worker. +pub fn start_worker_with_capacity( + memory: Arc, + state: IngestionState, + capacity: usize, +) -> IngestionQueue { + let (tx, rx) = mpsc::channel::(capacity); tokio::spawn(ingestion_worker(memory, rx, state.clone())); - log::info!("[memory:ingestion_queue] background worker started"); + log::info!("[memory:ingestion_queue] background worker started capacity={capacity}"); IngestionQueue { tx, state } } @@ -122,7 +177,7 @@ pub fn start_worker_with_state( /// * `rx` - The receiver half of the job queue channel. async fn ingestion_worker( memory: Arc, - mut rx: mpsc::UnboundedReceiver, + mut rx: mpsc::Receiver, state: IngestionState, ) { log::debug!("[memory:ingestion_queue] worker loop entered"); @@ -198,3 +253,121 @@ async fn ingestion_worker( log::info!("[memory:ingestion_queue] worker shut down (channel closed)"); } + +#[cfg(test)] +mod tests { + //! Channel-bound tests. These build an [`IngestionQueue`] from a raw + //! `mpsc::channel` without spawning a worker — that lets the suite drive + //! the at-capacity and channel-closed branches deterministically without + //! standing up a real `UnifiedMemory` or contending with a draining task. + use super::*; + + use serde_json::json; + + fn fixture_job(title: &str) -> IngestionJob { + IngestionJob { + document_id: format!("doc-{title}"), + document: NamespaceDocumentInput { + namespace: "skill-test".to_string(), + key: title.to_string(), + title: title.to_string(), + content: "body".to_string(), + source_type: "doc".to_string(), + priority: "medium".to_string(), + tags: Vec::new(), + metadata: json!({}), + category: "core".to_string(), + session_id: None, + document_id: None, + }, + config: MemoryIngestionConfig::default(), + } + } + + /// Build a queue plus its receiver, deliberately without spawning a + /// worker, so tests can keep jobs in the channel. + fn channel_only_queue(capacity: usize) -> (IngestionQueue, mpsc::Receiver) { + let (tx, rx) = mpsc::channel::(capacity); + let queue = IngestionQueue { + tx, + state: IngestionState::new(), + }; + (queue, rx) + } + + #[tokio::test] + async fn submit_succeeds_until_capacity_then_drops() { + let (queue, _rx) = channel_only_queue(2); + + assert!(queue.submit(fixture_job("a")), "first submit must enqueue"); + assert!(queue.submit(fixture_job("b")), "second submit must enqueue"); + + // Channel is now full. tokio's bounded mpsc reserves one slot per + // permit, so capacity=2 means at most two pending; the third must be + // rejected with `false`. + assert!( + !queue.submit(fixture_job("c")), + "submit at capacity must return false (drop)" + ); + + // queue_depth must reflect only the accepted jobs — the drop path + // is required to decrement so the status RPC does not drift upward. + assert_eq!( + queue.state().snapshot().queue_depth, + 2, + "queue_depth must roll back on overflow drop" + ); + } + + #[tokio::test] + async fn submit_recovers_after_drain() { + let (queue, mut rx) = channel_only_queue(1); + + assert!(queue.submit(fixture_job("first"))); + assert!( + !queue.submit(fixture_job("over")), + "second submit at cap=1 must drop" + ); + + // Drain the receiver to free a slot. + let pulled = rx.try_recv().expect("first job must be readable"); + assert_eq!(pulled.document.title, "first"); + // Mirror the worker's accounting (queue depth -> dequeue) so the + // post-drain snapshot does not look like a leftover queued job. + queue.state().dequeue(); + + assert!( + queue.submit(fixture_job("after-drain")), + "submit after drain must enqueue" + ); + assert_eq!(queue.state().snapshot().queue_depth, 1); + } + + #[tokio::test] + async fn submit_after_worker_gone_returns_false() { + let (queue, rx) = channel_only_queue(4); + drop(rx); // simulate worker task exiting and dropping its receiver + + assert!( + !queue.submit(fixture_job("orphan")), + "submit must return false once the receiver is dropped" + ); + assert_eq!( + queue.state().snapshot().queue_depth, + 0, + "channel-closed drop path must roll the depth counter back" + ); + } + + #[test] + fn default_queue_capacity_is_bounded_and_reasonable() { + // Guardrail so future changes don't accidentally regress to an + // arbitrarily large default (or `usize::MAX`) without thinking about + // the producer-side memory bound. + assert!(DEFAULT_QUEUE_CAPACITY > 0); + assert!( + DEFAULT_QUEUE_CAPACITY <= 8 * 1024, + "default capacity is the memory ceiling under sustained overflow — keep it tight" + ); + } +} From ea3022b7559014350ace4ddc6486c8a31ac2577d Mon Sep 17 00:00:00 2001 From: obchain Date: Fri, 22 May 2026 00:05:07 +0530 Subject: [PATCH 2/3] fix(memory/ingestion): guard start_worker against zero capacity + log doc_id on drop MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CodeRabbit on #2444 flagged two follow-ups: 1. `tokio::sync::mpsc::channel(0)` panics with a cryptic Tokio-internal message ("mpsc bounded channel requires buffer > 0"). Add an explicit `assert!(capacity > 0, …)` in `start_worker_with_capacity` so misuse surfaces a clear, grep-friendly message at the call site instead of looking like a Tokio bug. New `#[should_panic]` test `start_worker_rejects_zero_capacity` pins the contract. 2. The drop-path warn logs now include `doc_id` alongside `namespace` and `title` so each warn line is a stable breadcrumb back to the upserted document whose graph-extraction follow-up was skipped. 5/5 tests pass; cargo fmt + check clean. --- src/openhuman/memory/ingestion/queue.rs | 39 +++++++++++++++++++++++-- 1 file changed, 36 insertions(+), 3 deletions(-) diff --git a/src/openhuman/memory/ingestion/queue.rs b/src/openhuman/memory/ingestion/queue.rs index 3f4282d01e..db9295b891 100644 --- a/src/openhuman/memory/ingestion/queue.rs +++ b/src/openhuman/memory/ingestion/queue.rs @@ -91,11 +91,14 @@ impl IngestionQueue { // Channel is at capacity — log loudly so observability can // surface the drop, then undo the enqueue bump so the queue // depth gauge does not drift upward forever under sustained - // overflow. + // overflow. Include the stable `document_id` so the warn + // line is the breadcrumb back to the upserted document + // whose graph-extraction follow-up was skipped. self.state.dequeue(); log::warn!( - "[memory:ingestion_queue] dropping job: queue at capacity ({} pending) namespace={} title={}", + "[memory:ingestion_queue] dropping job: queue at capacity ({} pending) doc_id={} namespace={} title={}", self.tx.max_capacity(), + dropped.document_id, dropped.document.namespace, dropped.document.title, ); @@ -107,7 +110,8 @@ impl IngestionQueue { // means the entire pipeline is dead, not just over-pressure. self.state.dequeue(); log::warn!( - "[memory:ingestion_queue] dropping job: worker channel closed (shutdown?) namespace={} title={}", + "[memory:ingestion_queue] dropping job: worker channel closed (shutdown?) doc_id={} namespace={} title={}", + dropped.document_id, dropped.document.namespace, dropped.document.title, ); @@ -153,11 +157,21 @@ pub fn start_worker_with_state( /// Start a worker with an explicit channel capacity. Exposed so unit tests /// can drive the at-capacity drop path deterministically without faking a /// slow worker. +/// +/// # Panics +/// +/// Panics if `capacity == 0`. `tokio::sync::mpsc::channel` itself panics on +/// a zero buffer, but the message is cryptic; the explicit guard here turns +/// the misuse into a clear, grep-friendly assertion at the call site. pub fn start_worker_with_capacity( memory: Arc, state: IngestionState, capacity: usize, ) -> IngestionQueue { + assert!( + capacity > 0, + "ingestion queue capacity must be greater than zero" + ); let (tx, rx) = mpsc::channel::(capacity); tokio::spawn(ingestion_worker(memory, rx, state.clone())); @@ -370,4 +384,23 @@ mod tests { "default capacity is the memory ceiling under sustained overflow — keep it tight" ); } + + /// Zero capacity would otherwise panic from inside + /// `tokio::sync::mpsc::channel` with a cryptic Tokio-internal message + /// (`mpsc bounded channel requires buffer > 0`) — the explicit guard in + /// [`start_worker_with_capacity`] turns that into a clear, grep-friendly + /// assertion at the call site so misuse fails fast with an actionable + /// message instead of looking like a Tokio bug. + #[tokio::test] + #[should_panic(expected = "ingestion queue capacity must be greater than zero")] + async fn start_worker_rejects_zero_capacity() { + use crate::openhuman::embeddings::NoopEmbedding; + use tempfile::TempDir; + let tmp = TempDir::new().unwrap(); + let memory = UnifiedMemory::new(tmp.path(), Arc::new(NoopEmbedding), None).unwrap(); + // Panic must surface from our own assert, not from the Tokio + // channel constructor on the line after — that's the contract this + // test pins. + let _ = start_worker_with_capacity(Arc::new(memory), IngestionState::new(), 0); + } } From 5dccf16d454cd02af9a9545862e436537561b5dd Mon Sep 17 00:00:00 2001 From: Steven Enamakel Date: Sat, 23 May 2026 01:16:35 -0700 Subject: [PATCH 3/3] fix(memory/ingestion): tighten queue visibility and clarify cap log - start_worker_with_capacity is only used by start_worker_with_state and the in-crate unit tests; downgrade pub -> pub(crate) so it does not silently become public API surface (addresses @M3gA-Mind nit). - The Full-branch warn previously labeled self.capacity as "{} pending", which always equals the static cap in that branch and reads as a live depth metric. Rename to cap={} so operators grepping the warn see it as a configured bound, not a fluctuating gauge. --- src/openhuman/memory/ingestion/queue.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/openhuman/memory/ingestion/queue.rs b/src/openhuman/memory/ingestion/queue.rs index 624ecc3fd6..c342764443 100644 --- a/src/openhuman/memory/ingestion/queue.rs +++ b/src/openhuman/memory/ingestion/queue.rs @@ -101,7 +101,7 @@ impl IngestionQueue { // whose graph-extraction follow-up was skipped. self.state.dequeue(); log::warn!( - "[memory:ingestion_queue] dropping job: queue at capacity ({} pending) doc_id={} namespace={} title={}", + "[memory:ingestion_queue] dropping job: queue at capacity (cap={}) doc_id={} namespace={} title={}", self.capacity, dropped.document_id, dropped.document.namespace, @@ -178,7 +178,7 @@ pub fn start_worker_with_state( /// Panics if `capacity == 0`. `tokio::sync::mpsc::channel` itself panics on /// a zero buffer, but the message is cryptic; the explicit guard here turns /// the misuse into a clear, grep-friendly assertion at the call site. -pub fn start_worker_with_capacity( +pub(crate) fn start_worker_with_capacity( memory: Arc, state: IngestionState, capacity: usize,