diff --git a/src/openhuman/memory/ingestion/queue.rs b/src/openhuman/memory/ingestion/queue.rs index e979352d03..c342764443 100644 --- a/src/openhuman/memory/ingestion/queue.rs +++ b/src/openhuman/memory/ingestion/queue.rs @@ -4,8 +4,12 @@ //! dedicated worker thread. This ensures that `doc_put` callers never block //! on the heavier parsing and graph-write path. //! -//! The queue uses a `tokio::sync::mpsc` channel to decouple document submission -//! from the actual extraction process. +//! The queue uses a bounded `tokio::sync::mpsc` channel +//! ([`DEFAULT_QUEUE_CAPACITY`]) to decouple document submission from the +//! actual extraction process. Producers call [`IngestionQueue::submit`], +//! which is non-blocking; when the buffer is full the job is dropped with a +//! warn-level log so a runaway producer cannot grow the queue without bound +//! and exhaust process memory. use std::sync::Arc; use std::time::Instant; @@ -17,8 +21,20 @@ use super::MemoryIngestionConfig; use crate::core::event_bus::{publish_global, DomainEvent}; use crate::openhuman::memory::store::{NamespaceDocumentInput, UnifiedMemory}; -/// Default bounded-channel capacity for the ingestion queue. Sized to absorb -/// realistic bursts (bulk skill sync of ~200 docs) while capping memory usage. +/// Default capacity of the ingestion job channel. +/// +/// Producers (`put_doc`, `store_skill_sync`) push jobs into this channel +/// without blocking; the worker drains them one-at-a-time under the +/// `IngestionState` singleton lock because the local extraction LLM cannot +/// run concurrently. A buggy or compromised producer can submit jobs much +/// faster than the worker drains them, so the channel must enforce an +/// explicit cap or the queue grows without bound and exhausts process +/// memory (each [`IngestionJob`] holds an owned document body). +/// +/// 512 is a deliberate middle ground: it absorbs reasonable bulk-import +/// bursts (e.g. backfilling a Notion workspace or a long Slack history) +/// without letting a runaway loop balloon RSS — at typical document sizes +/// of 1–100 KB the in-flight buffer caps below ~50 MB. pub const DEFAULT_QUEUE_CAPACITY: usize = 512; /// A job submitted to the ingestion worker. @@ -38,11 +54,14 @@ pub struct IngestionJob { /// Handle used by callers to submit ingestion jobs. /// -/// This is a thin wrapper around a `tokio::sync::mpsc::Sender` and -/// can be cloned freely to be shared across multiple producers. +/// This is a thin wrapper around a bounded `tokio::sync::mpsc::Sender` and +/// can be cloned freely to be shared across multiple producers. The bound +/// (see [`DEFAULT_QUEUE_CAPACITY`]) protects the core from runaway +/// producers; once the buffer is full, [`Self::submit`] returns `false` +/// instead of blocking or growing the queue. #[derive(Clone)] pub struct IngestionQueue { - /// Sender half of the job queue channel. + /// Sender half of the bounded job queue channel. tx: mpsc::Sender, /// Shared state — singleton lock, queue depth, status snapshot. state: IngestionState, @@ -63,24 +82,42 @@ impl IngestionQueue { /// # Returns /// /// Returns `true` if the job was successfully enqueued, `false` if the - /// queue is full (backpressure) or the worker has shut down. + /// queue is full (capacity reached) or the worker has shut down (e.g., + /// during application termination). In both drop cases the job is not + /// persisted into the extraction pipeline — the underlying document + /// upsert that the caller already performed is unaffected. The queue + /// depth counter is restored before returning so the + /// `memory_ingestion_status` RPC stays accurate. pub fn submit(&self, job: IngestionJob) -> bool { self.state.enqueue(); match self.tx.try_send(job) { Ok(()) => true, Err(mpsc::error::TrySendError::Full(dropped)) => { + // Channel is at capacity — log loudly so observability can + // surface the drop, then undo the enqueue bump so the queue + // depth gauge does not drift upward forever under sustained + // overflow. Include the stable `document_id` so the warn + // line is the breadcrumb back to the upserted document + // whose graph-extraction follow-up was skipped. self.state.dequeue(); log::warn!( - "[memory:ingestion_queue] queue full (capacity {}), dropping job: {}", + "[memory:ingestion_queue] dropping job: queue at capacity (cap={}) doc_id={} namespace={} title={}", self.capacity, + dropped.document_id, + dropped.document.namespace, dropped.document.title, ); false } Err(mpsc::error::TrySendError::Closed(dropped)) => { + // Worker is gone — same accounting as the full case, but a + // different reason worth distinguishing in logs because it + // means the entire pipeline is dead, not just over-pressure. self.state.dequeue(); log::warn!( - "[memory:ingestion_queue] failed to enqueue job (worker gone?): {}", + "[memory:ingestion_queue] dropping job: worker channel closed (shutdown?) doc_id={} namespace={} title={}", + dropped.document_id, + dropped.document.namespace, dropped.document.title, ); false @@ -124,7 +161,7 @@ pub fn start_worker(memory: Arc) -> IngestionQueue { /// Start a worker bound to a caller-supplied [`IngestionState`]. Useful when /// the synchronous ingest path needs to share the same singleton lock and -/// snapshot as the queue worker. +/// snapshot as the queue worker. Uses [`DEFAULT_QUEUE_CAPACITY`]. pub fn start_worker_with_state( memory: Arc, state: IngestionState, @@ -132,21 +169,29 @@ pub fn start_worker_with_state( start_worker_with_capacity(memory, state, DEFAULT_QUEUE_CAPACITY) } -/// Start a worker with an explicit channel capacity. Exposed for -/// deterministic tests that need a tiny queue to exercise backpressure. -pub fn start_worker_with_capacity( +/// Start a worker with an explicit channel capacity. Exposed so unit tests +/// can drive the at-capacity drop path deterministically without faking a +/// slow worker. +/// +/// # Panics +/// +/// Panics if `capacity == 0`. `tokio::sync::mpsc::channel` itself panics on +/// a zero buffer, but the message is cryptic; the explicit guard here turns +/// the misuse into a clear, grep-friendly assertion at the call site. +pub(crate) fn start_worker_with_capacity( memory: Arc, state: IngestionState, capacity: usize, ) -> IngestionQueue { + assert!( + capacity > 0, + "ingestion queue capacity must be greater than zero" + ); let (tx, rx) = mpsc::channel::(capacity); tokio::spawn(ingestion_worker(memory, rx, state.clone())); - log::debug!( - "[memory:ingestion_queue] background worker started (capacity={})", - capacity, - ); + log::info!("[memory:ingestion_queue] background worker started capacity={capacity}"); IngestionQueue { tx, state, @@ -244,88 +289,132 @@ async fn ingestion_worker( #[cfg(test)] mod tests { + //! Channel-bound tests. These build an [`IngestionQueue`] from a raw + //! `mpsc::channel` without spawning a worker — that lets the suite drive + //! the at-capacity and channel-closed branches deterministically without + //! standing up a real `UnifiedMemory` or contending with a draining task. use super::*; - use tokio::sync::mpsc; - #[tokio::test] - async fn submit_when_full_returns_false() { - // Capacity-1 channel, fill it, then submit another — exercises the Full branch. - let state = IngestionState::new(); - let (tx, _rx) = mpsc::channel::(1); - // Pre-fill the slot directly so submit() sees a full channel. - tx.try_send(make_dummy_job("filler")).ok(); + use serde_json::json; - let queue = IngestionQueue::from_parts(tx, state.clone(), 1); - assert!(!queue.submit(make_dummy_job("overflow"))); - // Depth should be 0 — enqueue was rolled back. - assert_eq!(state.snapshot().queue_depth, 0); + fn fixture_job(title: &str) -> IngestionJob { + IngestionJob { + document_id: format!("doc-{title}"), + document: NamespaceDocumentInput { + namespace: "skill-test".to_string(), + key: title.to_string(), + title: title.to_string(), + content: "body".to_string(), + source_type: "doc".to_string(), + priority: "medium".to_string(), + tags: Vec::new(), + metadata: json!({}), + category: "core".to_string(), + session_id: None, + document_id: None, + }, + config: MemoryIngestionConfig::default(), + } } #[tokio::test] - async fn submit_when_worker_gone_returns_false() { + async fn submit_succeeds_until_capacity_then_drops() { let state = IngestionState::new(); - let (tx, rx) = mpsc::channel::(4); - drop(rx); // simulate worker shutdown + let (tx, _rx) = mpsc::channel::(2); + let queue = IngestionQueue::from_parts(tx, state.clone(), 2); - let queue = IngestionQueue::from_parts(tx, state.clone(), 4); - assert!(!queue.submit(make_dummy_job("orphan"))); - assert_eq!(state.snapshot().queue_depth, 0); + assert!(queue.submit(fixture_job("a")), "first submit must enqueue"); + assert!(queue.submit(fixture_job("b")), "second submit must enqueue"); + + // Channel is now full. tokio's bounded mpsc reserves one slot per + // permit, so capacity=2 means at most two pending; the third must be + // rejected with `false`. + assert!( + !queue.submit(fixture_job("c")), + "submit at capacity must return false (drop)" + ); + + // queue_depth must reflect only the accepted jobs — the drop path + // is required to decrement so the status RPC does not drift upward. + assert_eq!( + state.snapshot().queue_depth, + 2, + "queue_depth must roll back on overflow drop" + ); } - /// Verify that `submit()` succeeds again after transient backpressure is - /// relieved (the channel drains and a slot becomes available). #[tokio::test] - async fn submit_recovers_after_backpressure() { + async fn submit_recovers_after_drain() { let state = IngestionState::new(); - // Capacity-2 channel so we can fill one slot and still have headroom - // for the recovery submit. - let (tx, mut rx) = mpsc::channel::(2); + let (tx, mut rx) = mpsc::channel::(1); + let queue = IngestionQueue::from_parts(tx, state.clone(), 1); - // Pre-fill both slots directly to force the Full condition on submit. - tx.try_send(make_dummy_job("filler-a")).ok(); - tx.try_send(make_dummy_job("filler-b")).ok(); + assert!(queue.submit(fixture_job("first"))); + assert!( + !queue.submit(fixture_job("over")), + "second submit at cap=1 must drop" + ); - let queue = IngestionQueue::from_parts(tx, state.clone(), 2); + // Drain the receiver to free a slot. + let pulled = rx.try_recv().expect("first job must be readable"); + assert_eq!(pulled.document.title, "first"); + // Mirror the worker's accounting (queue depth -> dequeue) so the + // post-drain snapshot does not look like a leftover queued job. + state.dequeue(); - // Channel is now full — submit should return false and roll back depth. - assert!(!queue.submit(make_dummy_job("overflow"))); - assert_eq!( - state.snapshot().queue_depth, - 0, - "depth must be 0 after rejected submit" + assert!( + queue.submit(fixture_job("after-drain")), + "submit after drain must enqueue" ); + assert_eq!(state.snapshot().queue_depth, 1); + } - // Drain one slot to free up space. - let _ = rx.recv().await; + #[tokio::test] + async fn submit_after_worker_gone_returns_false() { + let state = IngestionState::new(); + let (tx, rx) = mpsc::channel::(4); + drop(rx); // simulate worker task exiting and dropping its receiver + let queue = IngestionQueue::from_parts(tx, state.clone(), 4); - // submit() should now succeed and increment queue_depth by 1. - assert!(queue.submit(make_dummy_job("recovered"))); + assert!( + !queue.submit(fixture_job("orphan")), + "submit must return false once the receiver is dropped" + ); assert_eq!( state.snapshot().queue_depth, - 1, - "depth must reflect the recovered enqueue" + 0, + "channel-closed drop path must roll the depth counter back" ); } - fn make_dummy_job(title: &str) -> IngestionJob { - use crate::openhuman::memory::ingestion::MemoryIngestionConfig; - use crate::openhuman::memory::store::types::NamespaceDocumentInput; - IngestionJob { - document_id: format!("doc-{title}"), - document: NamespaceDocumentInput { - namespace: "test".to_string(), - key: title.to_string(), - title: title.to_string(), - content: "body".to_string(), - source_type: "doc".to_string(), - priority: "normal".to_string(), - tags: vec![], - metadata: serde_json::Value::Null, - category: "core".to_string(), - session_id: None, - document_id: None, - }, - config: MemoryIngestionConfig::default(), - } + #[test] + fn default_queue_capacity_is_bounded_and_reasonable() { + // Guardrail so future changes don't accidentally regress to an + // arbitrarily large default (or `usize::MAX`) without thinking about + // the producer-side memory bound. + assert!(DEFAULT_QUEUE_CAPACITY > 0); + assert!( + DEFAULT_QUEUE_CAPACITY <= 8 * 1024, + "default capacity is the memory ceiling under sustained overflow — keep it tight" + ); + } + + /// Zero capacity would otherwise panic from inside + /// `tokio::sync::mpsc::channel` with a cryptic Tokio-internal message + /// (`mpsc bounded channel requires buffer > 0`) — the explicit guard in + /// [`start_worker_with_capacity`] turns that into a clear, grep-friendly + /// assertion at the call site so misuse fails fast with an actionable + /// message instead of looking like a Tokio bug. + #[tokio::test] + #[should_panic(expected = "ingestion queue capacity must be greater than zero")] + async fn start_worker_rejects_zero_capacity() { + use crate::openhuman::embeddings::NoopEmbedding; + use tempfile::TempDir; + let tmp = TempDir::new().unwrap(); + let memory = UnifiedMemory::new(tmp.path(), Arc::new(NoopEmbedding), None).unwrap(); + // Panic must surface from our own assert, not from the Tokio + // channel constructor on the line after — that's the contract this + // test pins. + let _ = start_worker_with_capacity(Arc::new(memory), IngestionState::new(), 0); } }