diff --git a/.claude/memory.md b/.claude/memory.md index 684b1b235c..4c0006a796 100644 --- a/.claude/memory.md +++ b/.claude/memory.md @@ -258,6 +258,14 @@ Quick reference for anyone starting with Claude on this project. Updated by the - **`composio::action_tool` and `agent::harness::session::turn` intermittent failures** — These tests fail randomly when run as part of the full suite (likely shared state or timing), but pass individually. Not related to security/policy changes. Do not treat as blockers for security-module PRs. +## Windows OAuth Deep Link (Issue #2562) + +- **Three-layer fix**: (1) named-pipe IPC in `deep_link_ipc_windows.rs` — secondary process forwards `openhuman://` URL to primary via `\\.\pipe\com.openhuman.app-deeplink`, 40 retries × 50ms; (2) loopback OAuth server in `loopback_oauth.rs` — RFC 8252 one-shot `127.0.0.1:53824`, preferred path that eliminates deep link dispatch entirely; (3) Linux analog in `deep_link_ipc.rs` — Unix domain socket at `$XDG_RUNTIME_DIR/com.openhuman.app-deeplink.sock`. +- **`OAuthProviderButton.tsx` loopback flow** — tries loopback first, sets `redirectUri` for backend, awaits callback, rewrites `http://127.0.0.1:PORT/auth?...` → `openhuman://auth?...` → `handleDeepLinkUrls`. Falls back to deep link if bind fails. +- **Pipe binding location** — primary binds the named pipe in `lib.rs` right after the mutex guard (line 2269); `drain_pending_urls()` wired in `setup()` at line 2578. +- **Issue was already fixed before we picked it up** — PRs #2469, #2511, #2550 had already merged the fix. Our contribution was extracting `classify_request` as a pure function and adding 11 Rust unit tests. +- **Pure-function extraction pattern** — when async/AppHandle-gated Tauri code is untestable, extract a `classify_request(head, expected_state, bound_port) -> RequestOutcome` pure function returning an enum. Enables comprehensive unit tests with zero Tauri context. `RequestOutcome` has 4 variants: `AuthCallback`, `StateMismatch`, `NotFound`, `MethodNotAllowed`. + ## Port Conflict Recovery (Issue #2617) - **Port fallback already in `pick_listen_port`** — `src/openhuman/connectivity/rpc.rs` tries ports 7789–7798 when 7788 is busy. Gap was: frontend `getCoreRpcUrl()` cached the URL on first resolution so it never picked up the fallback port, and stale-process reaping was macOS-only. diff --git a/app/src-tauri/src/loopback_oauth.rs b/app/src-tauri/src/loopback_oauth.rs index c4b9d01ef2..e7e68ae57b 100644 --- a/app/src-tauri/src/loopback_oauth.rs +++ b/app/src-tauri/src/loopback_oauth.rs @@ -160,6 +160,45 @@ fn extract_state(query: &str) -> Option<&str> { .map(|(_, v)| v) } +/// Outcome of classifying one HTTP request against the loopback accept loop. +/// Extracted so routing logic can be unit-tested without a live `AppHandle`. +#[derive(Debug, PartialEq)] +enum RequestOutcome { + /// `/auth` matched and state is valid. Caller should send 200, emit callback. + AuthCallback { callback_url: String }, + /// `/auth` matched but `state=` was missing or wrong. Caller sends 400. + StateMismatch, + /// Path is not `/auth`. Caller sends 404. + NotFound, + /// Method is not GET. Caller sends 405. + MethodNotAllowed, +} + +/// Classify one HTTP/1.x request received by the loopback accept loop. +fn classify_request(head: &str, expected_state: &str, bound_port: u16) -> RequestOutcome { + let target = match parse_request_target(head) { + Some(t) => t.to_string(), + None => return RequestOutcome::MethodNotAllowed, + }; + + let (path, query) = match target.split_once('?') { + Some((p, q)) => (p, q), + None => (target.as_str(), ""), + }; + + if path != "/auth" { + return RequestOutcome::NotFound; + } + + match extract_state(query) { + Some(s) if s == expected_state => { + let callback_url = format!("http://127.0.0.1:{bound_port}{target}"); + RequestOutcome::AuthCallback { callback_url } + } + _ => RequestOutcome::StateMismatch, + } +} + const SUCCESS_BODY: &str = "Signed in\ \

You're signed in.

\ @@ -291,41 +330,36 @@ async fn run_accept_loop( } let head = String::from_utf8_lossy(&buf[..read]); - let target = match parse_request_target(&head) { - Some(t) => t.to_string(), - None => { - let _ = socket.write_all(&http_response("405 Method Not Allowed", "method not allowed")).await; - continue; + match classify_request(&head, &expected_state, bound_port) { + RequestOutcome::MethodNotAllowed => { + let _ = socket + .write_all(&http_response("405 Method Not Allowed", "method not allowed")) + .await; } - }; - - let (path, query) = match target.split_once('?') { - Some((p, q)) => (p, q), - None => (target.as_str(), ""), - }; - - if path != "/auth" { - let _ = socket.write_all(&http_response("404 Not Found", "not found")).await; - continue; - } - - match extract_state(query) { - Some(s) if s == expected_state => {} - _ => { - log::warn!("[loopback-oauth] /auth with missing or mismatched state — ignoring"); - let _ = socket.write_all(&http_response("400 Bad Request", "state mismatch")).await; - continue; + RequestOutcome::NotFound => { + let _ = socket + .write_all(&http_response("404 Not Found", "not found")) + .await; + } + RequestOutcome::StateMismatch => { + log::warn!( + "[loopback-oauth] /auth with missing or mismatched state — ignoring" + ); + let _ = socket + .write_all(&http_response("400 Bad Request", "state mismatch")) + .await; + } + RequestOutcome::AuthCallback { callback_url } => { + let _ = socket.write_all(&http_response("200 OK", SUCCESS_BODY)).await; + let _ = socket.flush().await; + if let Err(err) = + app.emit(LOOPBACK_CALLBACK_EVENT, CallbackPayload { url: callback_url }) + { + log::warn!("[loopback-oauth] emit callback event failed: {err}"); + } + return; } } - - let _ = socket.write_all(&http_response("200 OK", SUCCESS_BODY)).await; - let _ = socket.flush().await; - - let callback_url = format!("http://127.0.0.1:{}{}", bound_port, target); - if let Err(err) = app.emit(LOOPBACK_CALLBACK_EVENT, CallbackPayload { url: callback_url }) { - log::warn!("[loopback-oauth] emit callback event failed: {err}"); - } - return; } } } @@ -364,4 +398,121 @@ mod tests { assert_eq!(s.len(), 32); assert!(s.chars().all(|c| c.is_ascii_hexdigit())); } + + // ── classify_request ──────────────────────────────────────────────────── + + fn auth_head(query: &str) -> String { + format!("GET /auth{query} HTTP/1.1\r\nHost: 127.0.0.1\r\n\r\n") + } + + #[test] + fn classify_valid_auth_request_returns_callback_url() { + let head = auth_head("?token=jwt&state=deadbeef"); + let outcome = classify_request(&head, "deadbeef", 53824); + assert_eq!( + outcome, + RequestOutcome::AuthCallback { + callback_url: "http://127.0.0.1:53824/auth?token=jwt&state=deadbeef".to_string() + } + ); + } + + #[test] + fn classify_wrong_state_returns_state_mismatch() { + let head = auth_head("?token=jwt&state=wrong"); + assert_eq!( + classify_request(&head, "correct", 53824), + RequestOutcome::StateMismatch + ); + } + + #[test] + fn classify_missing_state_returns_state_mismatch() { + let head = auth_head("?token=jwt"); + assert_eq!( + classify_request(&head, "expected", 53824), + RequestOutcome::StateMismatch + ); + } + + #[test] + fn classify_no_query_string_on_auth_path_returns_state_mismatch() { + let head = "GET /auth HTTP/1.1\r\nHost: 127.0.0.1\r\n\r\n"; + assert_eq!( + classify_request(head, "nonce", 53824), + RequestOutcome::StateMismatch + ); + } + + #[test] + fn classify_favicon_returns_not_found() { + let head = "GET /favicon.ico HTTP/1.1\r\nHost: 127.0.0.1\r\n\r\n"; + assert_eq!( + classify_request(head, "state", 53824), + RequestOutcome::NotFound + ); + } + + #[test] + fn classify_root_path_returns_not_found() { + let head = "GET / HTTP/1.1\r\nHost: 127.0.0.1\r\n\r\n"; + assert_eq!( + classify_request(head, "state", 53824), + RequestOutcome::NotFound + ); + } + + #[test] + fn classify_post_method_returns_method_not_allowed() { + let head = "POST /auth?state=abc HTTP/1.1\r\nHost: 127.0.0.1\r\n\r\n"; + assert_eq!( + classify_request(head, "abc", 53824), + RequestOutcome::MethodNotAllowed + ); + } + + #[test] + fn classify_callback_url_uses_bound_port() { + let head = auth_head("?state=s&token=t"); + let outcome = classify_request(&head, "s", 12345); + assert_eq!( + outcome, + RequestOutcome::AuthCallback { + callback_url: "http://127.0.0.1:12345/auth?state=s&token=t".to_string() + } + ); + } + + #[test] + fn classify_state_only_query_returns_callback() { + // Minimal valid request: only state param, no other query params. + let head = auth_head("?state=abc123"); + assert_eq!( + classify_request(&head, "abc123", 53824), + RequestOutcome::AuthCallback { + callback_url: "http://127.0.0.1:53824/auth?state=abc123".to_string() + } + ); + } + + // ── bind_loopback (integration: real OS socket) ───────────────────────── + + #[tokio::test] + async fn bind_loopback_succeeds_on_ephemeral_port() { + let listener = bind_loopback(0).expect("bind on port 0 must succeed"); + let addr = listener.local_addr().expect("must have local addr"); + assert!(addr.ip().is_loopback()); + assert_ne!(addr.port(), 0, "OS should assign a non-zero ephemeral port"); + } + + #[tokio::test] + async fn bind_loopback_allows_rebind_via_so_reuseaddr() { + // Bind once, drop the listener, then bind again on the same port. The + // short TIME_WAIT window should not block the rebind because we set + // SO_REUSEADDR. + let listener = bind_loopback(0).expect("first bind"); + let port = listener.local_addr().unwrap().port(); + drop(listener); + let _ = bind_loopback(port).expect("rebind on same port must succeed with SO_REUSEADDR"); + } } diff --git a/app/test/e2e/specs/mega-flow.spec.ts b/app/test/e2e/specs/mega-flow.spec.ts index 8b59427341..6a3a05d708 100644 --- a/app/test/e2e/specs/mega-flow.spec.ts +++ b/app/test/e2e/specs/mega-flow.spec.ts @@ -100,11 +100,17 @@ async function resetEverything(label: string): Promise { headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({}), }).catch(() => {}); - clearRequestLog(); resetMockBehavior(); - // Settle a beat so any in-flight reactive HTTP calls don't bleed into the - // next scenario's request log. + // Settle a beat so any in-flight reactive HTTP calls from the previous + // scenario arrive and can be discarded — then clear AFTER the pause so + // stale requests don't appear in the next scenario's waitForMockRequest + // polls. (Clearing before the pause caused a race: in-flight /auth/me or + // /telegram/login-tokens/ requests from scenario N arrived during the + // 800 ms window, landed in the fresh log, and were matched by scenario + // N+1's waitForMockRequest — causing composio RPCs to fire before the + // new deep-link's auth flow completed.) await browser.pause(800); + clearRequestLog(); } describe('Mega flow — login + Gmail OAuth + Composio in one session', () => { diff --git a/src/openhuman/memory/ops/documents.rs b/src/openhuman/memory/ops/documents.rs index 1b89c80c5c..9314f81bf3 100644 --- a/src/openhuman/memory/ops/documents.rs +++ b/src/openhuman/memory/ops/documents.rs @@ -492,63 +492,12 @@ pub async fn memory_recall_memories( #[cfg(test)] mod tests { - use std::path::PathBuf; - use std::sync::OnceLock; - use serde_json::json; - use tempfile::TempDir; use super::*; - /// Held for the whole test: pins `OPENHUMAN_WORKSPACE` at a stable, - /// never-torn-down workspace under `TEST_ENV_LOCK`. These tests call - /// `memory_init` → `current_workspace_dir` → `Config::load_or_init`, which - /// reads that process-global env var; without this, a concurrent - /// env-mutating test can swap the var and tear down its tempdir mid-call, - /// yielding `SQLITE_IOERR` / config atomic-replace `ENOENT`. - struct WorkspaceEnvGuard { - _lock: std::sync::MutexGuard<'static, ()>, - previous: Option, - } - - impl WorkspaceEnvGuard { - fn set(path: &std::path::Path) -> Self { - let lock = crate::openhuman::config::TEST_ENV_LOCK - .lock() - .unwrap_or_else(|e| e.into_inner()); - let previous = std::env::var_os("OPENHUMAN_WORKSPACE"); - std::env::set_var("OPENHUMAN_WORKSPACE", path); - Self { - _lock: lock, - previous, - } - } - } - - impl Drop for WorkspaceEnvGuard { - fn drop(&mut self) { - if let Some(previous) = self.previous.as_ref() { - std::env::set_var("OPENHUMAN_WORKSPACE", previous); - } else { - std::env::remove_var("OPENHUMAN_WORKSPACE"); - } - } - } - - #[must_use] - fn ensure_memory_client() -> WorkspaceEnvGuard { - static WORKSPACE: OnceLock = OnceLock::new(); - let workspace = WORKSPACE.get_or_init(|| { - let tmp = TempDir::new().expect("tempdir"); - let path = tmp.path().join("workspace"); - std::fs::create_dir_all(&path).expect("workspace dir"); - std::mem::forget(tmp); - path - }); - // Pin the env BEFORE init so config load/save targets the stable dir. - let env_guard = WorkspaceEnvGuard::set(workspace); - let _ = crate::openhuman::memory::global::init(workspace.clone()); - env_guard + fn ensure_memory_client() { + crate::openhuman::memory::ops::ensure_shared_memory_client(); } fn unique_namespace(prefix: &str) -> String { diff --git a/src/openhuman/memory/ops/kv_graph.rs b/src/openhuman/memory/ops/kv_graph.rs index 2405460fb5..09f9bd4c51 100644 --- a/src/openhuman/memory/ops/kv_graph.rs +++ b/src/openhuman/memory/ops/kv_graph.rs @@ -137,23 +137,10 @@ pub async fn graph_query( #[cfg(test)] mod tests { - use std::path::PathBuf; - use std::sync::OnceLock; - - use tempfile::TempDir; - use super::*; fn ensure_memory_client() { - static WORKSPACE: OnceLock = OnceLock::new(); - let workspace = WORKSPACE.get_or_init(|| { - let tmp = TempDir::new().expect("tempdir"); - let path = tmp.path().join("workspace"); - std::fs::create_dir_all(&path).expect("workspace dir"); - std::mem::forget(tmp); - path - }); - let _ = crate::openhuman::memory::global::init(workspace.clone()); + crate::openhuman::memory::ops::ensure_shared_memory_client(); } fn unique_namespace(prefix: &str) -> String { diff --git a/src/openhuman/memory/ops/learn.rs b/src/openhuman/memory/ops/learn.rs index d3f3ca6fdf..40691c4dfc 100644 --- a/src/openhuman/memory/ops/learn.rs +++ b/src/openhuman/memory/ops/learn.rs @@ -157,7 +157,6 @@ pub async fn memory_learn_all( mod tests { use std::ffi::OsString; use std::path::PathBuf; - use std::sync::OnceLock; use serde_json::json; use tempfile::TempDir; @@ -166,15 +165,7 @@ mod tests { use crate::openhuman::memory_store::NamespaceDocumentInput; fn ensure_memory_client() { - static WORKSPACE: OnceLock = OnceLock::new(); - let workspace = WORKSPACE.get_or_init(|| { - let tmp = TempDir::new().expect("tempdir"); - let path = tmp.path().join("workspace"); - std::fs::create_dir_all(&path).expect("workspace dir"); - std::mem::forget(tmp); - path - }); - let _ = crate::openhuman::memory::global::init(workspace.clone()); + crate::openhuman::memory::ops::ensure_shared_memory_client(); } struct WorkspaceEnvGuard { diff --git a/src/openhuman/memory/ops/mod.rs b/src/openhuman/memory/ops/mod.rs index d7b2c737e6..d177e1262e 100644 --- a/src/openhuman/memory/ops/mod.rs +++ b/src/openhuman/memory/ops/mod.rs @@ -79,6 +79,11 @@ pub(crate) use helpers::{ pub(crate) static GLOBAL_MEMORY_TEST_LOCK: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(()); +#[cfg(test)] +mod test_support; +#[cfg(test)] +pub(crate) use test_support::ensure_shared_memory_client; + #[cfg(test)] #[path = "../ops_tests.rs"] mod tests; diff --git a/src/openhuman/memory/ops/sync.rs b/src/openhuman/memory/ops/sync.rs index b0a5c77df8..916cbbfe4f 100644 --- a/src/openhuman/memory/ops/sync.rs +++ b/src/openhuman/memory/ops/sync.rs @@ -228,13 +228,11 @@ pub async fn memory_ingestion_status() -> Result crate::openhuman::memory_store::MemoryClientRef { - static WORKSPACE: OnceLock = OnceLock::new(); - let workspace = WORKSPACE.get_or_init(|| { - let tmp = TempDir::new().expect("tempdir"); - let path = tmp.path().join("workspace"); - std::fs::create_dir_all(&path).expect("workspace dir"); - std::mem::forget(tmp); - path - }); - crate::openhuman::memory::global::init(workspace.clone()).expect("init memory client") + crate::openhuman::memory::ops::ensure_shared_memory_client(); + crate::openhuman::memory::global::client().expect("memory client") } struct ChannelCapture { diff --git a/src/openhuman/memory/ops/test_support.rs b/src/openhuman/memory/ops/test_support.rs new file mode 100644 index 0000000000..ce543a93c5 --- /dev/null +++ b/src/openhuman/memory/ops/test_support.rs @@ -0,0 +1,28 @@ +//! Shared test infrastructure for `memory::ops` submodule tests. +//! +//! All `ops` submodules that need a global [`MemoryClient`] call +//! [`ensure_shared_memory_client`] instead of creating their own +//! `OnceLock`. Sharing one leaked workspace means concurrent +//! `global::init()` calls always resolve to the same path and hit the +//! no-op fast-path inside `init_in_slot`, preventing one test thread +//! from silently rebinding the global under another thread's feet. + +use std::path::PathBuf; +use std::sync::OnceLock; + +/// Binds the process-global memory client to a single shared temp workspace. +/// +/// Safe to call from multiple test threads concurrently — subsequent calls with +/// the same workspace path return the existing client without rebinding. +pub(crate) fn ensure_shared_memory_client() { + static WORKSPACE: OnceLock = OnceLock::new(); + let workspace = WORKSPACE.get_or_init(|| { + let tmp = tempfile::TempDir::new().expect("tempdir"); + let path = tmp.path().join("workspace"); + std::fs::create_dir_all(&path).expect("workspace dir"); + std::mem::forget(tmp); + path + }); + crate::openhuman::memory::global::init(workspace.clone()) + .expect("initialize shared test memory client"); +} diff --git a/src/openhuman/memory/ops/tool_memory.rs b/src/openhuman/memory/ops/tool_memory.rs index ccc310b782..c68868a713 100644 --- a/src/openhuman/memory/ops/tool_memory.rs +++ b/src/openhuman/memory/ops/tool_memory.rs @@ -175,24 +175,11 @@ pub async fn tool_rules_json(params: ToolRuleListParams) -> Result = OnceLock::new(); - let workspace = WORKSPACE.get_or_init(|| { - let tmp = TempDir::new().expect("tempdir"); - let path = tmp.path().join("workspace"); - std::fs::create_dir_all(&path).expect("workspace dir"); - std::mem::forget(tmp); - path - }); - let _ = crate::openhuman::memory::global::init(workspace.clone()); + crate::openhuman::memory::ops::ensure_shared_memory_client(); } fn unique_tool_name() -> String { diff --git a/src/openhuman/memory/query/mod.rs b/src/openhuman/memory/query/mod.rs index bf677e71c1..67af0f5188 100644 --- a/src/openhuman/memory/query/mod.rs +++ b/src/openhuman/memory/query/mod.rs @@ -270,6 +270,14 @@ mod memory_tree_dispatcher_tests { #[tokio::test] async fn memory_tree_query_global_mode_dispatches_successfully() { + // Hold TEST_ENV_LOCK for the duration of the test so that concurrent + // tests using WorkspaceEnvGuard (which sets OPENHUMAN_WORKSPACE to a + // temp path and then deletes it) cannot race with the Config::load_or_init + // call inside MemoryTreeTool.execute — the race caused "Failed to read + // config file" when the temp dir was deleted between exists() and read(). + let _env_guard = crate::openhuman::config::TEST_ENV_LOCK + .lock() + .unwrap_or_else(|e| e.into_inner()); let result = MemoryTreeTool .execute(json!({ "mode": "query_global",