Skip to content

Commit 65c5174

Browse files
committed
Validate document polling
1 parent e9dd7bc commit 65c5174

4 files changed

Lines changed: 234 additions & 169 deletions

File tree

src/adeu/mcp_components/shared.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77
from typing import List
88

99
# Centralized MCP Configuration
10-
FRONTEND_URL = os.environ.get("ADEU_FRONTEND_URL", "https://app.adeu.ai")
11-
BACKEND_URL = os.environ.get("ADEU_BACKEND_URL", "https://app.adeu.ai")
10+
FRONTEND_URL = os.environ.get("ADEU_FRONTEND_URL", "http://localhost:5173")
11+
BACKEND_URL = os.environ.get("ADEU_BACKEND_URL", "http://localhost:8000")
1212
MARKDOWN_UI_URI = "ui://adeu/markdown-ui"
1313
EMAIL_UI_URI = "ui://adeu/email-ui"
1414

Lines changed: 133 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
# FILE: src/adeu/mcp_components/tools/validation.py
2+
import asyncio
23
import json
34
import urllib.error
45
import urllib.request
56
from pathlib import Path
6-
from typing import Annotated, List
7+
from typing import Annotated, List, Optional
78

89
from fastmcp import Context
910
from fastmcp.dependencies import Depends
@@ -21,118 +22,150 @@
2122

2223
@tool(
2324
description=(
24-
"Analyzes documents to find inconsistencies, contradictions, and risk assessments. "
25-
"Always present the complete report to the user, including every verbatim evidence quote "
26-
"exactly as returned, without summarizing or omitting any findings."
27-
"Run this on validation request for files or directories."
25+
"Validates documents for inconsistencies, contradictions, and risk assessments. "
26+
"To START a new validation, provide 'file_paths'. This will immediately return a task_id. "
27+
"To CHECK the status of a validation, call this tool AGAIN and provide ONLY the 'task_id'. "
28+
"The checking process will poll for up to 50 seconds. If it times out, continue checking."
2829
),
2930
timeout=300.0,
3031
annotations={"openWorldHint": True},
3132
meta={"ui": {"resourceUri": MARKDOWN_UI_URI}},
3233
)
3334
async def validate_documents(
34-
file_paths: Annotated[List[str], "List of absolute paths to documents (DOCX, PDF) OR directories."],
3535
ctx: Context,
36+
file_paths: Annotated[
37+
Optional[List[str]],
38+
"List of absolute paths to documents (DOCX, PDF) OR directories to start a new job.",
39+
] = None,
40+
task_id: Annotated[Optional[int], "If resuming a pending check, provide the task ID here."] = None,
3641
api_key: str = Depends(get_cloud_auth_token),
3742
) -> ToolResult:
38-
await ctx.info("Starting document validation", extra={"provided_paths": file_paths})
39-
40-
if not file_paths:
41-
await ctx.warning("No file paths provided by client")
42-
raise ToolError("You must provide at least 1 file path or directory to perform document validation.")
43-
44-
resolved_files: list[Path] = []
45-
valid_extensions = {".docx", ".pdf"}
46-
47-
await ctx.debug("Resolving files and directories")
48-
for path_str in file_paths:
49-
p = Path(path_str)
50-
if not p.exists():
51-
await ctx.error("Path not found on disk", extra={"missing_path": path_str})
52-
raise ToolError(f"Path not found on local disk: {path_str}")
53-
54-
if p.is_dir():
55-
for child in p.iterdir():
56-
if child.is_file() and child.suffix.lower() in valid_extensions:
57-
resolved_files.append(child)
58-
elif p.is_file():
59-
if p.suffix.lower() not in valid_extensions:
60-
await ctx.warning("Unsupported file type skipped", extra={"file": path_str})
61-
raise ToolError(f"Unsupported file type for {path_str}. Only .docx and .pdf are supported.")
62-
resolved_files.append(p)
63-
64-
resolved_files = list(set(resolved_files))
65-
66-
if not resolved_files:
67-
await ctx.error("No valid documents found in provided paths")
68-
raise ToolError("No supported documents (.docx or .pdf) were found in the provided paths.")
69-
70-
await ctx.info(
71-
f"Resolved {len(resolved_files)} file(s) for validation",
72-
extra={"files": [p.name for p in resolved_files]},
73-
)
74-
75-
files_data = []
76-
for p in resolved_files:
77-
try:
43+
if not file_paths and not task_id:
44+
raise ToolError(
45+
"You must provide either 'file_paths' to start a new validation, or 'task_id' to check an existing one."
46+
)
47+
48+
# ==========================================
49+
# PHASE 1: INIT (Upload and get task_id)
50+
# ==========================================
51+
if file_paths:
52+
await ctx.info(
53+
"Starting new document validation task",
54+
extra={"provided_paths": file_paths},
55+
)
56+
resolved_files: list[Path] = []
57+
valid_extensions = {".docx", ".pdf"}
58+
59+
for path_str in file_paths:
60+
p = Path(path_str)
61+
if not p.exists():
62+
raise ToolError(f"Path not found on local disk: {path_str}")
63+
64+
if p.is_dir():
65+
for child in p.iterdir():
66+
if child.is_file() and child.suffix.lower() in valid_extensions:
67+
resolved_files.append(child)
68+
elif p.is_file():
69+
if p.suffix.lower() not in valid_extensions:
70+
raise ToolError(f"Unsupported file type for {path_str}. Only .docx and .pdf are supported.")
71+
resolved_files.append(p)
72+
73+
resolved_files = list(set(resolved_files))
74+
if not resolved_files:
75+
raise ToolError("No supported documents (.docx or .pdf) were found in the provided paths.")
76+
77+
files_data = []
78+
for p in resolved_files:
7879
with open(p, "rb") as f:
7980
files_data.append(("files", p.name, f.read()))
80-
except Exception as e:
81-
await ctx.error("Failed to read file", extra={"filename": p.name, "error": str(e)})
82-
raise ToolError(f"Failed to read file {p.name}: {str(e)}") from e
83-
84-
await ctx.debug("Encoding multipart/form-data payload")
85-
body, content_type = _encode_multipart_formdata(files_data)
86-
url = f"{BACKEND_URL}/api/v1/documents/validate"
87-
88-
req = urllib.request.Request(
89-
url,
90-
data=body,
91-
headers={
92-
"Authorization": f"Bearer {api_key}",
93-
"Content-Type": content_type,
94-
"Accept": "application/json",
95-
},
96-
method="POST",
97-
)
98-
99-
try:
100-
await ctx.info(
101-
"Sending validation request to Adeu Cloud",
102-
extra={"url": url, "payload_size_bytes": len(body)},
81+
82+
body, content_type = _encode_multipart_formdata(files_data)
83+
url = f"{BACKEND_URL}/api/v1/documents/validate"
84+
85+
req = urllib.request.Request(
86+
url,
87+
data=body,
88+
headers={
89+
"Authorization": f"Bearer {api_key}",
90+
"Content-Type": content_type,
91+
"Accept": "application/json",
92+
},
93+
method="POST",
10394
)
104-
with urllib.request.urlopen(req) as response:
105-
data = json.loads(response.read().decode("utf-8"))
10695

107-
await ctx.debug("Received successful response from cloud API")
96+
try:
97+
response = await asyncio.to_thread(urllib.request.urlopen, req)
98+
data = json.loads(response.read().decode("utf-8"))
99+
new_task_id = data.get("task_id")
108100

109-
# The backend now provides the fully formatted markdown report
110-
markdown_report = data.get("report_markdown", "No report generated.")
111-
report_title = (
112-
resolved_files[0].name
113-
if len(resolved_files) == 1
114-
else f"Validation Report ({len(resolved_files)} files)"
115-
)
116-
return ToolResult(
117-
content=markdown_report,
118-
structured_content={"markdown": markdown_report, "title": report_title},
101+
msg = (
102+
f"Validation task started successfully. Task ID: {new_task_id}. "
103+
f"Please call `validate_documents` again immediately with "
104+
f"task_id={new_task_id} to monitor the progress."
119105
)
120-
121-
except urllib.error.HTTPError as e:
122-
if e.code == 401:
123-
await ctx.warning("Cloud authentication expired during validation")
124-
DesktopAuthManager.clear_api_key()
125-
raise ToolError("Your authentication expired. Please call `login_to_adeu_cloud` to re-authenticate.") from e
126-
elif e.code == 403:
127-
await ctx.warning("Authorization Error: User lacks access to this tool")
128-
raise ToolError("Authorization Error: You do not have access to use this tool.") from e
129-
130-
error_body = e.read().decode("utf-8")
131-
await ctx.error(
132-
"Cloud validation API failure",
133-
extra={"status_code": e.code, "body": error_body},
106+
await ctx.info(f"Task started: {new_task_id}")
107+
return ToolResult(content=msg, structured_content={"status": "pending", "message": msg})
108+
109+
except urllib.error.HTTPError as e:
110+
if e.code == 401:
111+
DesktopAuthManager.clear_api_key()
112+
raise ToolError(
113+
"Your authentication expired. Please call `login_to_adeu_cloud` to re-authenticate."
114+
) from e
115+
error_body = e.read().decode("utf-8")
116+
raise ToolError(f"Cloud analysis failed (HTTP {e.code}): {error_body}") from e
117+
except Exception as e:
118+
raise ToolError(f"Unexpected error: {str(e)}") from e
119+
120+
# ==========================================
121+
# PHASE 2: POLL (Wait for completion)
122+
# ==========================================
123+
poll_url = f"{BACKEND_URL}/api/v1/documents/validate/{task_id}"
124+
125+
# Poll up to 10 times (5 seconds each) = 50 seconds total
126+
for attempt in range(10):
127+
req = urllib.request.Request(
128+
poll_url,
129+
headers={
130+
"Authorization": f"Bearer {api_key}",
131+
"Accept": "application/json",
132+
},
134133
)
135-
raise ToolError(f"Cloud analysis failed (HTTP {e.code}): {error_body}") from e
136-
except Exception as e:
137-
await ctx.error("Unexpected error communicating with Adeu Cloud", extra={"error": str(e)})
138-
raise ToolError(f"Failed to communicate with Adeu Cloud: {str(e)}") from e
134+
135+
try:
136+
response = await asyncio.to_thread(urllib.request.urlopen, req)
137+
data = json.loads(response.read().decode("utf-8"))
138+
status = data.get("status")
139+
140+
if status == "COMPLETED":
141+
markdown_report = data.get("report_markdown", "No report generated.")
142+
return ToolResult(
143+
content=markdown_report,
144+
structured_content={
145+
"markdown": markdown_report,
146+
"title": f"Validation Report #{task_id}",
147+
"status": "completed",
148+
},
149+
)
150+
151+
if status == "FAILED":
152+
error_msg = data.get("error", "Unknown internal error")
153+
raise ToolError(f"Validation task failed on the server: {error_msg}")
154+
155+
await ctx.debug(f"Task {task_id} status is {status}. Attempt {attempt + 1}/10. Sleeping 5s.")
156+
157+
except urllib.error.HTTPError as e:
158+
if e.code == 401:
159+
DesktopAuthManager.clear_api_key()
160+
raise ToolError("Your authentication expired. Please re-authenticate.") from e
161+
error_body = e.read().decode("utf-8")
162+
raise ToolError(f"Failed to check task status (HTTP {e.code}): {error_body}") from e
163+
except Exception as e:
164+
raise ToolError(f"Unexpected error checking task status: {str(e)}") from e
165+
166+
# Sleep 5 seconds before the next poll
167+
await asyncio.sleep(5)
168+
169+
# If we reach here, the 50s timeout has been reached but it's still pending
170+
msg = f"Task {task_id} is still processing. Please call `validate_documents` again with task_id={task_id}."
171+
return ToolResult(content=msg, structured_content={"status": "pending", "message": msg})

0 commit comments

Comments
 (0)