Feat/2358 clear reembed skipped#2443
Conversation
📝 WalkthroughWalkthroughThis PR adds a persistent tombstone system to prevent infinite re-embed backfill retries on rows with terminal failures, migrates RPC pending/processed accounting to the sidecar embedding table, fixes SQLite ingest transaction locking, and improves Windows dev-server pnpm discovery. ChangesRe-embed backfill runaway-loop fix with terminal failure tombstones
Supporting infrastructure fixes and migrations
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Warning Review ran into problems🔥 ProblemsGit: Failed to clone repository. Please run the Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (4)
src/openhuman/memory/tree/jobs/mod.rs (1)
76-97: ⚡ Quick winExtract this uncovered-work probe into one shared helper.
This predicate now exists here, in
migrate_legacy_embeddings_to_sidecar, and again in the regression tests. If one copy drifts, the migration path and the runtime trigger will disagree about whether backfill is complete, which is exactly how this runaway-loop fix regresses. A singlehas_uncovered_reembed_work(conn, sig)helper would keep that contract in one place.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/openhuman/memory/tree/jobs/mod.rs` around lines 76 - 97, Extract the uncovered-work SQL probe into a single helper function (e.g., has_uncovered_reembed_work(conn: &Connection, sig: &str) -> rusqlite::Result<bool>) and replace the duplicated inline query in this file and in migrate_legacy_embeddings_to_sidecar and the regression tests with calls to that helper; move the exact SQL string and rusqlite::params![sig] + row extraction (r.get(0)) into the helper so the predicate logic is centralized and return the boolean result, keeping all callers' error handling the same.src/openhuman/memory/tree/jobs/handlers/mod.rs (1)
668-696: ⚡ Quick winDon't swallow tombstone persistence failures.
let _ = mark_*_reembed_skipped(...)drops the one error that explains why the same row keeps reappearing. If the sentinel write fails, this handler falls back to retrying that row indefinitely at the current signature, but the logs only show the original body/embed failure. Please at leastwarn!on a failed tombstone write with the id and signature.♻️ Minimal pattern
- let _ = chunk_store::mark_chunk_reembed_skipped( - config, - id, - &active_sig, - &format!("embed failed: {e}"), - ); + if let Err(mark_err) = chunk_store::mark_chunk_reembed_skipped( + config, + id, + &active_sig, + &format!("embed failed: {e}"), + ) { + log::warn!( + "[memory_tree::jobs] reembed_backfill: failed to persist chunk tombstone chunk_id={} sig={}: {mark_err}", + id, + active_sig, + ); + }As per coding guidelines "Use log / tracing at debug or trace level on RPC entry and exit, error paths, state transitions, and any branch that is hard to infer from tests alone. Use structured, grep-friendly context with stable prefixes..."
Also applies to: 709-737
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/openhuman/memory/tree/jobs/handlers/mod.rs` around lines 668 - 696, The mark_chunk_reembed_skipped calls are currently ignoring (let _ = ...) any error from persisting the tombstone sentinel; instead check the Result returned by chunk_store::mark_chunk_reembed_skipped (the calls that pass config, id, &active_sig, and a message like "embed failed: {e}" or "body read failed: {e}" and "embed wrong dim") and if it is Err, emit a warn! (including the chunk id and active_sig and the persistence error) so tombstone write failures are visible in logs; apply the same change to the other occurrences around reembed_backfill (including the calls referenced in the blocks for embed failure and body read failure).src/openhuman/memory/tree/store.rs (1)
1202-1267: ⚡ Quick winAdd debug logs around tombstone mutations.
These helpers introduce operator-facing state transitions, but today a mark/clear/bulk-clear leaves no trace explaining why a row started or stopped appearing in the worklist. Please emit a stable
[memory_tree::store]debug/trace log with the id/signature and bulk deleted count here, and mirror the same pattern in the summary helpers.As per coding guidelines "Use log / tracing at debug or trace level on RPC entry and exit, error paths, state transitions, and any branch that is hard to infer from tests alone. Use structured, grep-friendly context with stable prefixes..."
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/openhuman/memory/tree/store.rs` around lines 1202 - 1267, The tombstone mutation helpers (mark_chunk_reembed_skipped, clear_chunk_reembed_skipped, clear_reembed_skipped_for_signature and the equivalent summary helpers) need operator-facing debug/trace logs: add a stable "[memory_tree::store]" debug/trace line in mark_chunk_reembed_skipped that logs chunk_id and model_signature (and reason on mark), in clear_chunk_reembed_skipped that logs chunk_id and model_signature when deleting, and in clear_reembed_skipped_for_signature that logs the model_signature and the returned bulk-deleted count; use the project's tracing/log facility at debug or trace level and emit structured, grep-friendly key=value pairs so state transitions are visible to operators.src/openhuman/memory/tree/store_tests.rs (1)
366-400: ⚡ Quick winCover the summary table in the bulk-clear test.
clear_reembed_skipped_for_signaturedeletes from both tombstone tables, but this test only populatesmem_tree_chunk_reembed_skipped, so a broken summary-sideDELETEwould still pass. Seed one summary tombstone and include it in the deleted-count/assertions.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/openhuman/memory/tree/store_tests.rs` around lines 366 - 400, The test clear_reembed_skipped_for_signature_removes_all_tombstones_for_sig only seeds the per-chunk tombstone table and misses the summary-side tombstone; add a seeded row in the summary tombstone table (e.g., mem_tree_chunk_reembed_skipped_summary) for the same chunk and model signature after calling mark_chunk_reembed_skipped, then call clear_reembed_skipped_for_signature(&cfg, &sig) and update the expected deleted count to include the summary row (adjust assert_eq!(deleted, ...) accordingly) and add assertions that the summary table has 0 rows for that signature and that the per-chunk summary for the "other" signature is still preserved; reference the test fn clear_reembed_skipped_for_signature_removes_all_tombstones_for_sig, the helper mark_chunk_reembed_skipped, and the function clear_reembed_skipped_for_signature to locate where to seed and assert.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/openhuman/memory/sync_status/rpc.rs`:
- Around line 137-156: Change the wave detection so anchors are only opened when
there is a pending row inside the current time window: in the wave_anchors CTE
(symbols: wave_anchors, provider_chunks p, provider_max m, provider_pending pp)
move/augment the pending check so it requires pp.pending > 0 AND the pending
row’s timestamp falls within the window (e.g. pp.created_at_ms >= m.max_created
- ?1) rather than just provider-wide pp.pending > 0; this prevents
old-outside-window pending rows from causing anchors on newer already-embedded
chunks and fixes batch_total/batch_processed showing a completed wave. Also add
a regression test that constructs an old pending row outside the window plus
newer embedded chunks and asserts no active wave is reported.
---
Nitpick comments:
In `@src/openhuman/memory/tree/jobs/handlers/mod.rs`:
- Around line 668-696: The mark_chunk_reembed_skipped calls are currently
ignoring (let _ = ...) any error from persisting the tombstone sentinel; instead
check the Result returned by chunk_store::mark_chunk_reembed_skipped (the calls
that pass config, id, &active_sig, and a message like "embed failed: {e}" or
"body read failed: {e}" and "embed wrong dim") and if it is Err, emit a warn!
(including the chunk id and active_sig and the persistence error) so tombstone
write failures are visible in logs; apply the same change to the other
occurrences around reembed_backfill (including the calls referenced in the
blocks for embed failure and body read failure).
In `@src/openhuman/memory/tree/jobs/mod.rs`:
- Around line 76-97: Extract the uncovered-work SQL probe into a single helper
function (e.g., has_uncovered_reembed_work(conn: &Connection, sig: &str) ->
rusqlite::Result<bool>) and replace the duplicated inline query in this file and
in migrate_legacy_embeddings_to_sidecar and the regression tests with calls to
that helper; move the exact SQL string and rusqlite::params![sig] + row
extraction (r.get(0)) into the helper so the predicate logic is centralized and
return the boolean result, keeping all callers' error handling the same.
In `@src/openhuman/memory/tree/store_tests.rs`:
- Around line 366-400: The test
clear_reembed_skipped_for_signature_removes_all_tombstones_for_sig only seeds
the per-chunk tombstone table and misses the summary-side tombstone; add a
seeded row in the summary tombstone table (e.g.,
mem_tree_chunk_reembed_skipped_summary) for the same chunk and model signature
after calling mark_chunk_reembed_skipped, then call
clear_reembed_skipped_for_signature(&cfg, &sig) and update the expected deleted
count to include the summary row (adjust assert_eq!(deleted, ...) accordingly)
and add assertions that the summary table has 0 rows for that signature and that
the per-chunk summary for the "other" signature is still preserved; reference
the test fn clear_reembed_skipped_for_signature_removes_all_tombstones_for_sig,
the helper mark_chunk_reembed_skipped, and the function
clear_reembed_skipped_for_signature to locate where to seed and assert.
In `@src/openhuman/memory/tree/store.rs`:
- Around line 1202-1267: The tombstone mutation helpers
(mark_chunk_reembed_skipped, clear_chunk_reembed_skipped,
clear_reembed_skipped_for_signature and the equivalent summary helpers) need
operator-facing debug/trace logs: add a stable "[memory_tree::store]"
debug/trace line in mark_chunk_reembed_skipped that logs chunk_id and
model_signature (and reason on mark), in clear_chunk_reembed_skipped that logs
chunk_id and model_signature when deleting, and in
clear_reembed_skipped_for_signature that logs the model_signature and the
returned bulk-deleted count; use the project's tracing/log facility at debug or
trace level and emit structured, grep-friendly key=value pairs so state
transitions are visible to operators.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: f1694dbc-d768-4009-969f-e184880dfbdc
📒 Files selected for processing (8)
scripts/run-dev-win.shsrc/openhuman/memory/sync_status/rpc.rssrc/openhuman/memory/tree/ingest.rssrc/openhuman/memory/tree/jobs/handlers/mod.rssrc/openhuman/memory/tree/jobs/mod.rssrc/openhuman/memory/tree/store.rssrc/openhuman/memory/tree/store_tests.rssrc/openhuman/memory/tree/tree_source/store.rs
| wave_anchors AS ( \ | ||
| SELECT p.provider, MIN(p.created_at_ms) AS anchor \ | ||
| FROM provider_chunks p \ | ||
| JOIN provider_max m ON p.provider = m.provider \ | ||
| JOIN provider_pending pp ON p.provider = pp.provider \ | ||
| WHERE pp.pending > 0 \ | ||
| AND p.created_at_ms >= m.max_created - ?1 \ | ||
| GROUP BY p.provider \ | ||
| ) \ | ||
| SELECT \ | ||
| p.provider, \ | ||
| COUNT(*) AS chunks_synced, \ | ||
| SUM(CASE WHEN p.embedded = 0 THEN 1 ELSE 0 END) AS chunks_pending, \ | ||
| SUM(CASE WHEN w.anchor IS NOT NULL \ | ||
| AND p.created_at_ms >= w.anchor \ | ||
| THEN 1 ELSE 0 END) AS batch_total, \ | ||
| SUM(CASE WHEN w.anchor IS NOT NULL \ | ||
| AND p.created_at_ms >= w.anchor \ | ||
| AND p.embedded = 1 \ | ||
| THEN 1 ELSE 0 END) AS batch_processed, \ |
There was a problem hiding this comment.
Only open an active wave when the current window still has unembedded chunks.
Right now pp.pending > 0 is provider-wide, so an old leftover pending chunk can make wave_anchors latch onto newer already-embedded chunks. In that case batch_total / batch_processed become non-zero and the UI shows a completed wave instead of "no active wave". Gate the anchor on pending rows inside m.max_created - ?1, and add a regression case for “old pending outside the window + newer embedded chunks”.
Suggested query adjustment
- provider_pending AS ( \
- SELECT provider, \
- SUM(CASE WHEN embedded = 0 THEN 1 ELSE 0 END) AS pending \
- FROM provider_chunks \
- GROUP BY provider \
- ), \
wave_anchors AS ( \
SELECT p.provider, MIN(p.created_at_ms) AS anchor \
FROM provider_chunks p \
JOIN provider_max m ON p.provider = m.provider \
- JOIN provider_pending pp ON p.provider = pp.provider \
- WHERE pp.pending > 0 \
- AND p.created_at_ms >= m.max_created - ?1 \
+ WHERE p.created_at_ms >= m.max_created - ?1 \
GROUP BY p.provider \
+ HAVING SUM(CASE WHEN p.embedded = 0 THEN 1 ELSE 0 END) > 0 \
) \📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| wave_anchors AS ( \ | |
| SELECT p.provider, MIN(p.created_at_ms) AS anchor \ | |
| FROM provider_chunks p \ | |
| JOIN provider_max m ON p.provider = m.provider \ | |
| JOIN provider_pending pp ON p.provider = pp.provider \ | |
| WHERE pp.pending > 0 \ | |
| AND p.created_at_ms >= m.max_created - ?1 \ | |
| GROUP BY p.provider \ | |
| ) \ | |
| SELECT \ | |
| p.provider, \ | |
| COUNT(*) AS chunks_synced, \ | |
| SUM(CASE WHEN p.embedded = 0 THEN 1 ELSE 0 END) AS chunks_pending, \ | |
| SUM(CASE WHEN w.anchor IS NOT NULL \ | |
| AND p.created_at_ms >= w.anchor \ | |
| THEN 1 ELSE 0 END) AS batch_total, \ | |
| SUM(CASE WHEN w.anchor IS NOT NULL \ | |
| AND p.created_at_ms >= w.anchor \ | |
| AND p.embedded = 1 \ | |
| THEN 1 ELSE 0 END) AS batch_processed, \ | |
| wave_anchors AS ( \ | |
| SELECT p.provider, MIN(p.created_at_ms) AS anchor \ | |
| FROM provider_chunks p \ | |
| JOIN provider_max m ON p.provider = m.provider \ | |
| WHERE p.created_at_ms >= m.max_created - ?1 \ | |
| GROUP BY p.provider \ | |
| HAVING SUM(CASE WHEN p.embedded = 0 THEN 1 ELSE 0 END) > 0 \ | |
| ) \ | |
| SELECT \ | |
| p.provider, \ | |
| COUNT(*) AS chunks_synced, \ | |
| SUM(CASE WHEN p.embedded = 0 THEN 1 ELSE 0 END) AS chunks_pending, \ | |
| SUM(CASE WHEN w.anchor IS NOT NULL \ | |
| AND p.created_at_ms >= w.anchor \ | |
| THEN 1 ELSE 0 END) AS batch_total, \ | |
| SUM(CASE WHEN w.anchor IS NOT NULL \ | |
| AND p.created_at_ms >= w.anchor \ | |
| AND p.embedded = 1 \ | |
| THEN 1 ELSE 0 END) AS batch_processed, \ |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/openhuman/memory/sync_status/rpc.rs` around lines 137 - 156, Change the
wave detection so anchors are only opened when there is a pending row inside the
current time window: in the wave_anchors CTE (symbols: wave_anchors,
provider_chunks p, provider_max m, provider_pending pp) move/augment the pending
check so it requires pp.pending > 0 AND the pending row’s timestamp falls within
the window (e.g. pp.created_at_ms >= m.max_created - ?1) rather than just
provider-wide pp.pending > 0; this prevents old-outside-window pending rows from
causing anchors on newer already-embedded chunks and fixes
batch_total/batch_processed showing a completed wave. Also add a regression test
that constructs an old pending row outside the window plus newer embedded chunks
and asserts no active wave is reported.
|
@YellowSnnowmann this PR has merge conflicts with main — please rebase/resolve before review. |
|
@MrMrVlad this PR has merge conflicts with main — please rebase/resolve before review. |
…inyhumansai#2358) Operators can remove chunk/summary tombstones after fixing moved workspaces or embedder misconfig without hand-editing SQLite. Validates id/signature inputs; bulk clear is scoped to one model_signature. Depends on PR tinyhumansai#2349 (tombstone tables). Co-authored-by: Cursor <cursoragent@cursor.com>
Fixes E0106 under Rust 1.93 so lib tests compile. Co-authored-by: Cursor <cursoragent@cursor.com>
2452942 to
9251e65
Compare
Summary
Adds clear_chunk_reembed_skipped, clear_summary_reembed_skipped, and clear_reembed_skipped_for_signature (#2358).
Input validation on mark/clear paths; library-only (no new RPC).
Depends on #2349.
Closes #2358
Local verification (2026-05-21)
cargo test reembed_skip reembed_backfill_tombstones --lib — 5 passed
bash scripts/test-rust-with-mock.sh --lib reembed_skip — passed
Lifetime fix for Rust 1.93 (validate_reembed_skip_key<'a>)
Upstream
Open against tinyhumansai/openhuman:main:
main...MrMrVlad:feat/2358-clear-reembed-skipped
Summary by CodeRabbit
Bug Fixes
Chores