Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .claude/memory.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,14 @@ Quick reference for anyone starting with Claude on this project. Updated by the

- **`composio::action_tool` and `agent::harness::session::turn` intermittent failures** — These tests fail randomly when run as part of the full suite (likely shared state or timing), but pass individually. Not related to security/policy changes. Do not treat as blockers for security-module PRs.

## Windows OAuth Deep Link (Issue #2562)

- **Three-layer fix**: (1) named-pipe IPC in `deep_link_ipc_windows.rs` — secondary process forwards `openhuman://` URL to primary via `\\.\pipe\com.openhuman.app-deeplink`, 40 retries × 50ms; (2) loopback OAuth server in `loopback_oauth.rs` — RFC 8252 one-shot `127.0.0.1:53824`, preferred path that eliminates deep link dispatch entirely; (3) Linux analog in `deep_link_ipc.rs` — Unix domain socket at `$XDG_RUNTIME_DIR/com.openhuman.app-deeplink.sock`.
- **`OAuthProviderButton.tsx` loopback flow** — tries loopback first, sets `redirectUri` for backend, awaits callback, rewrites `http://127.0.0.1:PORT/auth?...` → `openhuman://auth?...` → `handleDeepLinkUrls`. Falls back to deep link if bind fails.
- **Pipe binding location** — primary binds the named pipe in `lib.rs` right after the mutex guard (line 2269); `drain_pending_urls()` wired in `setup()` at line 2578.
- **Issue was already fixed before we picked it up** — PRs #2469, #2511, #2550 had already merged the fix. Our contribution was extracting `classify_request` as a pure function and adding 11 Rust unit tests.
- **Pure-function extraction pattern** — when async/AppHandle-gated Tauri code is untestable, extract a `classify_request(head, expected_state, bound_port) -> RequestOutcome` pure function returning an enum. Enables comprehensive unit tests with zero Tauri context. `RequestOutcome` has 4 variants: `AuthCallback`, `StateMismatch`, `NotFound`, `MethodNotAllowed`.

## Port Conflict Recovery (Issue #2617)

- **Port fallback already in `pick_listen_port`** — `src/openhuman/connectivity/rpc.rs` tries ports 7789–7798 when 7788 is busy. Gap was: frontend `getCoreRpcUrl()` cached the URL on first resolution so it never picked up the fallback port, and stale-process reaping was macOS-only.
Expand Down
215 changes: 183 additions & 32 deletions app/src-tauri/src/loopback_oauth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,45 @@ fn extract_state(query: &str) -> Option<&str> {
.map(|(_, v)| v)
}

/// Outcome of classifying one HTTP request against the loopback accept loop.
/// Extracted so routing logic can be unit-tested without a live `AppHandle`.
#[derive(Debug, PartialEq)]
enum RequestOutcome {
/// `/auth` matched and state is valid. Caller should send 200, emit callback.
AuthCallback { callback_url: String },
/// `/auth` matched but `state=` was missing or wrong. Caller sends 400.
StateMismatch,
/// Path is not `/auth`. Caller sends 404.
NotFound,
/// Method is not GET. Caller sends 405.
MethodNotAllowed,
}

/// Classify one HTTP/1.x request received by the loopback accept loop.
fn classify_request(head: &str, expected_state: &str, bound_port: u16) -> RequestOutcome {
let target = match parse_request_target(head) {
Some(t) => t.to_string(),
None => return RequestOutcome::MethodNotAllowed,
};

let (path, query) = match target.split_once('?') {
Some((p, q)) => (p, q),
None => (target.as_str(), ""),
};

if path != "/auth" {
return RequestOutcome::NotFound;
}

match extract_state(query) {
Some(s) if s == expected_state => {
let callback_url = format!("http://127.0.0.1:{bound_port}{target}");
RequestOutcome::AuthCallback { callback_url }
}
_ => RequestOutcome::StateMismatch,
}
}

const SUCCESS_BODY: &str = "<!doctype html><meta charset=utf-8><title>Signed in</title>\
<body style=\"font-family:system-ui;display:flex;align-items:center;justify-content:center;height:100vh;margin:0;color:#1c1c1e;background:#f5f5f7\">\
<div style=\"text-align:center\"><h2 style=\"margin:0 0 8px\">You're signed in.</h2>\
Expand Down Expand Up @@ -291,41 +330,36 @@ async fn run_accept_loop(
}

let head = String::from_utf8_lossy(&buf[..read]);
let target = match parse_request_target(&head) {
Some(t) => t.to_string(),
None => {
let _ = socket.write_all(&http_response("405 Method Not Allowed", "method not allowed")).await;
continue;
match classify_request(&head, &expected_state, bound_port) {
RequestOutcome::MethodNotAllowed => {
let _ = socket
.write_all(&http_response("405 Method Not Allowed", "method not allowed"))
.await;
}
};

let (path, query) = match target.split_once('?') {
Some((p, q)) => (p, q),
None => (target.as_str(), ""),
};

if path != "/auth" {
let _ = socket.write_all(&http_response("404 Not Found", "not found")).await;
continue;
}

match extract_state(query) {
Some(s) if s == expected_state => {}
_ => {
log::warn!("[loopback-oauth] /auth with missing or mismatched state — ignoring");
let _ = socket.write_all(&http_response("400 Bad Request", "state mismatch")).await;
continue;
RequestOutcome::NotFound => {
let _ = socket
.write_all(&http_response("404 Not Found", "not found"))
.await;
}
RequestOutcome::StateMismatch => {
log::warn!(
"[loopback-oauth] /auth with missing or mismatched state — ignoring"
);
let _ = socket
.write_all(&http_response("400 Bad Request", "state mismatch"))
.await;
}
RequestOutcome::AuthCallback { callback_url } => {
let _ = socket.write_all(&http_response("200 OK", SUCCESS_BODY)).await;
let _ = socket.flush().await;
if let Err(err) =
app.emit(LOOPBACK_CALLBACK_EVENT, CallbackPayload { url: callback_url })
{
log::warn!("[loopback-oauth] emit callback event failed: {err}");
}
return;
}
}

let _ = socket.write_all(&http_response("200 OK", SUCCESS_BODY)).await;
let _ = socket.flush().await;

let callback_url = format!("http://127.0.0.1:{}{}", bound_port, target);
if let Err(err) = app.emit(LOOPBACK_CALLBACK_EVENT, CallbackPayload { url: callback_url }) {
log::warn!("[loopback-oauth] emit callback event failed: {err}");
}
return;
}
}
}
Expand Down Expand Up @@ -364,4 +398,121 @@ mod tests {
assert_eq!(s.len(), 32);
assert!(s.chars().all(|c| c.is_ascii_hexdigit()));
}

// ── classify_request ────────────────────────────────────────────────────

fn auth_head(query: &str) -> String {
format!("GET /auth{query} HTTP/1.1\r\nHost: 127.0.0.1\r\n\r\n")
}

#[test]
fn classify_valid_auth_request_returns_callback_url() {
let head = auth_head("?token=jwt&state=deadbeef");
let outcome = classify_request(&head, "deadbeef", 53824);
assert_eq!(
outcome,
RequestOutcome::AuthCallback {
callback_url: "http://127.0.0.1:53824/auth?token=jwt&state=deadbeef".to_string()
}
);
}

#[test]
fn classify_wrong_state_returns_state_mismatch() {
let head = auth_head("?token=jwt&state=wrong");
assert_eq!(
classify_request(&head, "correct", 53824),
RequestOutcome::StateMismatch
);
}

#[test]
fn classify_missing_state_returns_state_mismatch() {
let head = auth_head("?token=jwt");
assert_eq!(
classify_request(&head, "expected", 53824),
RequestOutcome::StateMismatch
);
}

#[test]
fn classify_no_query_string_on_auth_path_returns_state_mismatch() {
let head = "GET /auth HTTP/1.1\r\nHost: 127.0.0.1\r\n\r\n";
assert_eq!(
classify_request(head, "nonce", 53824),
RequestOutcome::StateMismatch
);
}

#[test]
fn classify_favicon_returns_not_found() {
let head = "GET /favicon.ico HTTP/1.1\r\nHost: 127.0.0.1\r\n\r\n";
assert_eq!(
classify_request(head, "state", 53824),
RequestOutcome::NotFound
);
}

#[test]
fn classify_root_path_returns_not_found() {
let head = "GET / HTTP/1.1\r\nHost: 127.0.0.1\r\n\r\n";
assert_eq!(
classify_request(head, "state", 53824),
RequestOutcome::NotFound
);
}

#[test]
fn classify_post_method_returns_method_not_allowed() {
let head = "POST /auth?state=abc HTTP/1.1\r\nHost: 127.0.0.1\r\n\r\n";
assert_eq!(
classify_request(head, "abc", 53824),
RequestOutcome::MethodNotAllowed
);
}

#[test]
fn classify_callback_url_uses_bound_port() {
let head = auth_head("?state=s&token=t");
let outcome = classify_request(&head, "s", 12345);
assert_eq!(
outcome,
RequestOutcome::AuthCallback {
callback_url: "http://127.0.0.1:12345/auth?state=s&token=t".to_string()
}
);
}

#[test]
fn classify_state_only_query_returns_callback() {
// Minimal valid request: only state param, no other query params.
let head = auth_head("?state=abc123");
assert_eq!(
classify_request(&head, "abc123", 53824),
RequestOutcome::AuthCallback {
callback_url: "http://127.0.0.1:53824/auth?state=abc123".to_string()
}
);
}

// ── bind_loopback (integration: real OS socket) ─────────────────────────

#[tokio::test]
async fn bind_loopback_succeeds_on_ephemeral_port() {
let listener = bind_loopback(0).expect("bind on port 0 must succeed");
let addr = listener.local_addr().expect("must have local addr");
assert!(addr.ip().is_loopback());
assert_ne!(addr.port(), 0, "OS should assign a non-zero ephemeral port");
}

#[tokio::test]
async fn bind_loopback_allows_rebind_via_so_reuseaddr() {
// Bind once, drop the listener, then bind again on the same port. The
// short TIME_WAIT window should not block the rebind because we set
// SO_REUSEADDR.
let listener = bind_loopback(0).expect("first bind");
let port = listener.local_addr().unwrap().port();
drop(listener);
let _ = bind_loopback(port).expect("rebind on same port must succeed with SO_REUSEADDR");
}
}
12 changes: 9 additions & 3 deletions app/test/e2e/specs/mega-flow.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,17 @@ async function resetEverything(label: string): Promise<void> {
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({}),
}).catch(() => {});
clearRequestLog();
resetMockBehavior();
// Settle a beat so any in-flight reactive HTTP calls don't bleed into the
// next scenario's request log.
// Settle a beat so any in-flight reactive HTTP calls from the previous
// scenario arrive and can be discarded — then clear AFTER the pause so
// stale requests don't appear in the next scenario's waitForMockRequest
// polls. (Clearing before the pause caused a race: in-flight /auth/me or
// /telegram/login-tokens/ requests from scenario N arrived during the
// 800 ms window, landed in the fresh log, and were matched by scenario
// N+1's waitForMockRequest — causing composio RPCs to fire before the
// new deep-link's auth flow completed.)
await browser.pause(800);
clearRequestLog();
}

describe('Mega flow — login + Gmail OAuth + Composio in one session', () => {
Expand Down
55 changes: 2 additions & 53 deletions src/openhuman/memory/ops/documents.rs
Original file line number Diff line number Diff line change
Expand Up @@ -492,63 +492,12 @@ pub async fn memory_recall_memories(

#[cfg(test)]
mod tests {
use std::path::PathBuf;
use std::sync::OnceLock;

use serde_json::json;
use tempfile::TempDir;

use super::*;

/// Held for the whole test: pins `OPENHUMAN_WORKSPACE` at a stable,
/// never-torn-down workspace under `TEST_ENV_LOCK`. These tests call
/// `memory_init` → `current_workspace_dir` → `Config::load_or_init`, which
/// reads that process-global env var; without this, a concurrent
/// env-mutating test can swap the var and tear down its tempdir mid-call,
/// yielding `SQLITE_IOERR` / config atomic-replace `ENOENT`.
struct WorkspaceEnvGuard {
_lock: std::sync::MutexGuard<'static, ()>,
previous: Option<std::ffi::OsString>,
}

impl WorkspaceEnvGuard {
fn set(path: &std::path::Path) -> Self {
let lock = crate::openhuman::config::TEST_ENV_LOCK
.lock()
.unwrap_or_else(|e| e.into_inner());
let previous = std::env::var_os("OPENHUMAN_WORKSPACE");
std::env::set_var("OPENHUMAN_WORKSPACE", path);
Self {
_lock: lock,
previous,
}
}
}

impl Drop for WorkspaceEnvGuard {
fn drop(&mut self) {
if let Some(previous) = self.previous.as_ref() {
std::env::set_var("OPENHUMAN_WORKSPACE", previous);
} else {
std::env::remove_var("OPENHUMAN_WORKSPACE");
}
}
}

#[must_use]
fn ensure_memory_client() -> WorkspaceEnvGuard {
static WORKSPACE: OnceLock<PathBuf> = OnceLock::new();
let workspace = WORKSPACE.get_or_init(|| {
let tmp = TempDir::new().expect("tempdir");
let path = tmp.path().join("workspace");
std::fs::create_dir_all(&path).expect("workspace dir");
std::mem::forget(tmp);
path
});
// Pin the env BEFORE init so config load/save targets the stable dir.
let env_guard = WorkspaceEnvGuard::set(workspace);
let _ = crate::openhuman::memory::global::init(workspace.clone());
env_guard
fn ensure_memory_client() {
crate::openhuman::memory::ops::ensure_shared_memory_client();
}

fn unique_namespace(prefix: &str) -> String {
Expand Down
15 changes: 1 addition & 14 deletions src/openhuman/memory/ops/kv_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,23 +137,10 @@ pub async fn graph_query(

#[cfg(test)]
mod tests {
use std::path::PathBuf;
use std::sync::OnceLock;

use tempfile::TempDir;

use super::*;

fn ensure_memory_client() {
static WORKSPACE: OnceLock<PathBuf> = OnceLock::new();
let workspace = WORKSPACE.get_or_init(|| {
let tmp = TempDir::new().expect("tempdir");
let path = tmp.path().join("workspace");
std::fs::create_dir_all(&path).expect("workspace dir");
std::mem::forget(tmp);
path
});
let _ = crate::openhuman::memory::global::init(workspace.clone());
crate::openhuman::memory::ops::ensure_shared_memory_client();
}

fn unique_namespace(prefix: &str) -> String {
Expand Down
11 changes: 1 addition & 10 deletions src/openhuman/memory/ops/learn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ pub async fn memory_learn_all(
mod tests {
use std::ffi::OsString;
use std::path::PathBuf;
use std::sync::OnceLock;

use serde_json::json;
use tempfile::TempDir;
Expand All @@ -166,15 +165,7 @@ mod tests {
use crate::openhuman::memory_store::NamespaceDocumentInput;

fn ensure_memory_client() {
static WORKSPACE: OnceLock<PathBuf> = OnceLock::new();
let workspace = WORKSPACE.get_or_init(|| {
let tmp = TempDir::new().expect("tempdir");
let path = tmp.path().join("workspace");
std::fs::create_dir_all(&path).expect("workspace dir");
std::mem::forget(tmp);
path
});
let _ = crate::openhuman::memory::global::init(workspace.clone());
crate::openhuman::memory::ops::ensure_shared_memory_client();
}

struct WorkspaceEnvGuard {
Expand Down
Loading
Loading