Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 167 additions & 78 deletions src/openhuman/memory/ingestion/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@
//! dedicated worker thread. This ensures that `doc_put` callers never block
//! on the heavier parsing and graph-write path.
//!
//! The queue uses a `tokio::sync::mpsc` channel to decouple document submission
//! from the actual extraction process.
//! The queue uses a bounded `tokio::sync::mpsc` channel
//! ([`DEFAULT_QUEUE_CAPACITY`]) to decouple document submission from the
//! actual extraction process. Producers call [`IngestionQueue::submit`],
//! which is non-blocking; when the buffer is full the job is dropped with a
//! warn-level log so a runaway producer cannot grow the queue without bound
//! and exhaust process memory.

use std::sync::Arc;
use std::time::Instant;
Expand All @@ -17,8 +21,20 @@ use super::MemoryIngestionConfig;
use crate::core::event_bus::{publish_global, DomainEvent};
use crate::openhuman::memory::store::{NamespaceDocumentInput, UnifiedMemory};

/// Default bounded-channel capacity for the ingestion queue. Sized to absorb
/// realistic bursts (bulk skill sync of ~200 docs) while capping memory usage.
/// Default capacity of the ingestion job channel.
///
/// Producers (`put_doc`, `store_skill_sync`) push jobs into this channel
/// without blocking; the worker drains them one-at-a-time under the
/// `IngestionState` singleton lock because the local extraction LLM cannot
/// run concurrently. A buggy or compromised producer can submit jobs much
/// faster than the worker drains them, so the channel must enforce an
/// explicit cap or the queue grows without bound and exhausts process
/// memory (each [`IngestionJob`] holds an owned document body).
///
/// 512 is a deliberate middle ground: it absorbs reasonable bulk-import
/// bursts (e.g. backfilling a Notion workspace or a long Slack history)
/// without letting a runaway loop balloon RSS — at typical document sizes
/// of 1–100 KB the in-flight buffer caps below ~50 MB.
pub const DEFAULT_QUEUE_CAPACITY: usize = 512;

/// A job submitted to the ingestion worker.
Expand All @@ -38,11 +54,14 @@ pub struct IngestionJob {

/// Handle used by callers to submit ingestion jobs.
///
/// This is a thin wrapper around a `tokio::sync::mpsc::Sender` and
/// can be cloned freely to be shared across multiple producers.
/// This is a thin wrapper around a bounded `tokio::sync::mpsc::Sender` and
/// can be cloned freely to be shared across multiple producers. The bound
/// (see [`DEFAULT_QUEUE_CAPACITY`]) protects the core from runaway
/// producers; once the buffer is full, [`Self::submit`] returns `false`
/// instead of blocking or growing the queue.
#[derive(Clone)]
pub struct IngestionQueue {
/// Sender half of the job queue channel.
/// Sender half of the bounded job queue channel.
tx: mpsc::Sender<IngestionJob>,
/// Shared state — singleton lock, queue depth, status snapshot.
state: IngestionState,
Expand All @@ -63,24 +82,42 @@ impl IngestionQueue {
/// # Returns
///
/// Returns `true` if the job was successfully enqueued, `false` if the
/// queue is full (backpressure) or the worker has shut down.
/// queue is full (capacity reached) or the worker has shut down (e.g.,
/// during application termination). In both drop cases the job is not
/// persisted into the extraction pipeline — the underlying document
/// upsert that the caller already performed is unaffected. The queue
/// depth counter is restored before returning so the
/// `memory_ingestion_status` RPC stays accurate.
pub fn submit(&self, job: IngestionJob) -> bool {
self.state.enqueue();
match self.tx.try_send(job) {
Ok(()) => true,
Err(mpsc::error::TrySendError::Full(dropped)) => {
// Channel is at capacity — log loudly so observability can
// surface the drop, then undo the enqueue bump so the queue
// depth gauge does not drift upward forever under sustained
// overflow. Include the stable `document_id` so the warn
// line is the breadcrumb back to the upserted document
// whose graph-extraction follow-up was skipped.
self.state.dequeue();
log::warn!(
"[memory:ingestion_queue] queue full (capacity {}), dropping job: {}",
"[memory:ingestion_queue] dropping job: queue at capacity (cap={}) doc_id={} namespace={} title={}",
self.capacity,
dropped.document_id,
dropped.document.namespace,
dropped.document.title,
);
false
}
Err(mpsc::error::TrySendError::Closed(dropped)) => {
// Worker is gone — same accounting as the full case, but a
// different reason worth distinguishing in logs because it
// means the entire pipeline is dead, not just over-pressure.
self.state.dequeue();
log::warn!(
"[memory:ingestion_queue] failed to enqueue job (worker gone?): {}",
"[memory:ingestion_queue] dropping job: worker channel closed (shutdown?) doc_id={} namespace={} title={}",
dropped.document_id,
dropped.document.namespace,
dropped.document.title,
);
false
Expand Down Expand Up @@ -124,29 +161,37 @@ pub fn start_worker(memory: Arc<UnifiedMemory>) -> IngestionQueue {

/// Start a worker bound to a caller-supplied [`IngestionState`]. Useful when
/// the synchronous ingest path needs to share the same singleton lock and
/// snapshot as the queue worker.
/// snapshot as the queue worker. Uses [`DEFAULT_QUEUE_CAPACITY`].
pub fn start_worker_with_state(
memory: Arc<UnifiedMemory>,
state: IngestionState,
) -> IngestionQueue {
start_worker_with_capacity(memory, state, DEFAULT_QUEUE_CAPACITY)
}

/// Start a worker with an explicit channel capacity. Exposed for
/// deterministic tests that need a tiny queue to exercise backpressure.
pub fn start_worker_with_capacity(
/// Start a worker with an explicit channel capacity. Exposed so unit tests
/// can drive the at-capacity drop path deterministically without faking a
/// slow worker.
///
/// # Panics
///
/// Panics if `capacity == 0`. `tokio::sync::mpsc::channel` itself panics on
/// a zero buffer, but the message is cryptic; the explicit guard here turns
/// the misuse into a clear, grep-friendly assertion at the call site.
pub(crate) fn start_worker_with_capacity(
memory: Arc<UnifiedMemory>,
state: IngestionState,
capacity: usize,
) -> IngestionQueue {
assert!(
capacity > 0,
"ingestion queue capacity must be greater than zero"
);
let (tx, rx) = mpsc::channel::<IngestionJob>(capacity);
Comment thread
coderabbitai[bot] marked this conversation as resolved.

tokio::spawn(ingestion_worker(memory, rx, state.clone()));

log::debug!(
"[memory:ingestion_queue] background worker started (capacity={})",
capacity,
);
log::info!("[memory:ingestion_queue] background worker started capacity={capacity}");
IngestionQueue {
tx,
state,
Expand Down Expand Up @@ -244,88 +289,132 @@ async fn ingestion_worker(

#[cfg(test)]
mod tests {
//! Channel-bound tests. These build an [`IngestionQueue`] from a raw
//! `mpsc::channel` without spawning a worker — that lets the suite drive
//! the at-capacity and channel-closed branches deterministically without
//! standing up a real `UnifiedMemory` or contending with a draining task.
use super::*;
use tokio::sync::mpsc;

#[tokio::test]
async fn submit_when_full_returns_false() {
// Capacity-1 channel, fill it, then submit another — exercises the Full branch.
let state = IngestionState::new();
let (tx, _rx) = mpsc::channel::<IngestionJob>(1);
// Pre-fill the slot directly so submit() sees a full channel.
tx.try_send(make_dummy_job("filler")).ok();
use serde_json::json;

let queue = IngestionQueue::from_parts(tx, state.clone(), 1);
assert!(!queue.submit(make_dummy_job("overflow")));
// Depth should be 0 — enqueue was rolled back.
assert_eq!(state.snapshot().queue_depth, 0);
fn fixture_job(title: &str) -> IngestionJob {
IngestionJob {
document_id: format!("doc-{title}"),
document: NamespaceDocumentInput {
namespace: "skill-test".to_string(),
key: title.to_string(),
title: title.to_string(),
content: "body".to_string(),
source_type: "doc".to_string(),
priority: "medium".to_string(),
tags: Vec::new(),
metadata: json!({}),
category: "core".to_string(),
session_id: None,
document_id: None,
},
config: MemoryIngestionConfig::default(),
}
}

#[tokio::test]
async fn submit_when_worker_gone_returns_false() {
async fn submit_succeeds_until_capacity_then_drops() {
let state = IngestionState::new();
let (tx, rx) = mpsc::channel::<IngestionJob>(4);
drop(rx); // simulate worker shutdown
let (tx, _rx) = mpsc::channel::<IngestionJob>(2);
let queue = IngestionQueue::from_parts(tx, state.clone(), 2);

let queue = IngestionQueue::from_parts(tx, state.clone(), 4);
assert!(!queue.submit(make_dummy_job("orphan")));
assert_eq!(state.snapshot().queue_depth, 0);
assert!(queue.submit(fixture_job("a")), "first submit must enqueue");
assert!(queue.submit(fixture_job("b")), "second submit must enqueue");

// Channel is now full. tokio's bounded mpsc reserves one slot per
// permit, so capacity=2 means at most two pending; the third must be
// rejected with `false`.
assert!(
!queue.submit(fixture_job("c")),
"submit at capacity must return false (drop)"
);

// queue_depth must reflect only the accepted jobs — the drop path
// is required to decrement so the status RPC does not drift upward.
assert_eq!(
state.snapshot().queue_depth,
2,
"queue_depth must roll back on overflow drop"
);
}

/// Verify that `submit()` succeeds again after transient backpressure is
/// relieved (the channel drains and a slot becomes available).
#[tokio::test]
async fn submit_recovers_after_backpressure() {
async fn submit_recovers_after_drain() {
let state = IngestionState::new();
// Capacity-2 channel so we can fill one slot and still have headroom
// for the recovery submit.
let (tx, mut rx) = mpsc::channel::<IngestionJob>(2);
let (tx, mut rx) = mpsc::channel::<IngestionJob>(1);
let queue = IngestionQueue::from_parts(tx, state.clone(), 1);

// Pre-fill both slots directly to force the Full condition on submit.
tx.try_send(make_dummy_job("filler-a")).ok();
tx.try_send(make_dummy_job("filler-b")).ok();
assert!(queue.submit(fixture_job("first")));
assert!(
!queue.submit(fixture_job("over")),
"second submit at cap=1 must drop"
);

let queue = IngestionQueue::from_parts(tx, state.clone(), 2);
// Drain the receiver to free a slot.
let pulled = rx.try_recv().expect("first job must be readable");
assert_eq!(pulled.document.title, "first");
// Mirror the worker's accounting (queue depth -> dequeue) so the
// post-drain snapshot does not look like a leftover queued job.
state.dequeue();

// Channel is now full — submit should return false and roll back depth.
assert!(!queue.submit(make_dummy_job("overflow")));
assert_eq!(
state.snapshot().queue_depth,
0,
"depth must be 0 after rejected submit"
assert!(
queue.submit(fixture_job("after-drain")),
"submit after drain must enqueue"
);
assert_eq!(state.snapshot().queue_depth, 1);
}

// Drain one slot to free up space.
let _ = rx.recv().await;
#[tokio::test]
async fn submit_after_worker_gone_returns_false() {
let state = IngestionState::new();
let (tx, rx) = mpsc::channel::<IngestionJob>(4);
drop(rx); // simulate worker task exiting and dropping its receiver
let queue = IngestionQueue::from_parts(tx, state.clone(), 4);

// submit() should now succeed and increment queue_depth by 1.
assert!(queue.submit(make_dummy_job("recovered")));
assert!(
!queue.submit(fixture_job("orphan")),
"submit must return false once the receiver is dropped"
);
assert_eq!(
state.snapshot().queue_depth,
1,
"depth must reflect the recovered enqueue"
0,
"channel-closed drop path must roll the depth counter back"
);
}

fn make_dummy_job(title: &str) -> IngestionJob {
use crate::openhuman::memory::ingestion::MemoryIngestionConfig;
use crate::openhuman::memory::store::types::NamespaceDocumentInput;
IngestionJob {
document_id: format!("doc-{title}"),
document: NamespaceDocumentInput {
namespace: "test".to_string(),
key: title.to_string(),
title: title.to_string(),
content: "body".to_string(),
source_type: "doc".to_string(),
priority: "normal".to_string(),
tags: vec![],
metadata: serde_json::Value::Null,
category: "core".to_string(),
session_id: None,
document_id: None,
},
config: MemoryIngestionConfig::default(),
}
#[test]
fn default_queue_capacity_is_bounded_and_reasonable() {
// Guardrail so future changes don't accidentally regress to an
// arbitrarily large default (or `usize::MAX`) without thinking about
// the producer-side memory bound.
assert!(DEFAULT_QUEUE_CAPACITY > 0);
assert!(
DEFAULT_QUEUE_CAPACITY <= 8 * 1024,
"default capacity is the memory ceiling under sustained overflow — keep it tight"
);
}

/// Zero capacity would otherwise panic from inside
/// `tokio::sync::mpsc::channel` with a cryptic Tokio-internal message
/// (`mpsc bounded channel requires buffer > 0`) — the explicit guard in
/// [`start_worker_with_capacity`] turns that into a clear, grep-friendly
/// assertion at the call site so misuse fails fast with an actionable
/// message instead of looking like a Tokio bug.
#[tokio::test]
#[should_panic(expected = "ingestion queue capacity must be greater than zero")]
async fn start_worker_rejects_zero_capacity() {
use crate::openhuman::embeddings::NoopEmbedding;
use tempfile::TempDir;
let tmp = TempDir::new().unwrap();
let memory = UnifiedMemory::new(tmp.path(), Arc::new(NoopEmbedding), None).unwrap();
// Panic must surface from our own assert, not from the Tokio
// channel constructor on the line after — that's the contract this
// test pins.
let _ = start_worker_with_capacity(Arc::new(memory), IngestionState::new(), 0);
}
}
Loading