diff --git a/studio/api.py b/studio/api.py index fc2cad2..1697fc5 100644 --- a/studio/api.py +++ b/studio/api.py @@ -63,7 +63,7 @@ def _convert_prompts_to_yaml_format(prompts: List[Dict[str, Any]]) -> List[Dict[ Frontend format: [{"role": "system", "content": "..."}, {"role": "user", "content": "..."}] SyGra YAML format: [{"system": "..."}, {"user": "..."}] - + Multi-modal frontend format: [{"role": "user", "content": [{"type": "text", "text": "..."}, {"type": "audio_url", "audio_url": "..."}]}] Multi-modal YAML format: [{"user": [{"type": "text", "text": "..."}, {"type": "audio_url", "audio_url": "..."}]}] """ @@ -1039,7 +1039,7 @@ async def get_workflow_sample_data(workflow_id: str, limit: int = 3, source_inde table = source.get("table") fields = source.get("fields", []) alias = source.get("alias") - + if not table: return { "records": [], @@ -1047,11 +1047,11 @@ async def get_workflow_sample_data(workflow_id: str, limit: int = 3, source_inde "source_type": "servicenow", "message": "No table specified for ServiceNow source" } - + try: from sygra.core.dataset.servicenow_handler import ServiceNowHandler from sygra.core.dataset.dataset_config import DataSourceConfig as SygraDataSourceConfig - + # Build source config for ServiceNowHandler snow_config = SygraDataSourceConfig( type="servicenow", @@ -1062,10 +1062,10 @@ async def get_workflow_sample_data(workflow_id: str, limit: int = 3, source_inde order_by=source.get("order_by"), order_desc=source.get("order_desc", False), ) - + handler = ServiceNowHandler(source_config=snow_config) raw_records = handler.read() - + # Flatten records (ServiceNow returns value/display_value dicts) records = [] for record in raw_records[:limit]: @@ -1076,7 +1076,7 @@ async def get_workflow_sample_data(workflow_id: str, limit: int = 3, source_inde else: flat_record[key] = value records.append(flat_record) - + total_count = len(raw_records) return { "records": records, @@ -1086,7 +1086,7 @@ async def get_workflow_sample_data(workflow_id: str, limit: int = 3, source_inde "alias": alias, "message": None } - + except Exception as snow_err: # Fall back to showing configured fields if connection fails error_msg = str(snow_err) @@ -1446,6 +1446,91 @@ async def get_execution(execution_id: str): raise HTTPException(status_code=404, detail=f"Execution {execution_id} not found") + @app.get("/api/executions/{execution_id}/artifacts/post-processors") + async def get_post_processor_artifacts(execution_id: str): + if execution_id in _executions: + execution = _executions[execution_id] + else: + storage = _get_execution_storage() + execution = storage.get_execution(execution_id) + if not execution: + raise HTTPException(status_code=404, detail=f"Execution {execution_id} not found") + + output_file = execution.output_file + if not output_file and isinstance(execution.metadata, dict): + output_file = execution.metadata.get("output_file") + + if not output_file: + return {"files": []} + + output_path = Path(output_file) + if not output_path.is_absolute(): + output_path = (Path.cwd() / output_path).resolve() + else: + output_path = output_path.resolve() + + out_dir = output_path.parent + out_name = output_path.name + + output_ts = None + run_prefix = None + + if "_output_" in out_name and out_name.endswith(".json"): + run_prefix, remainder = out_name.split("_output_", 1) + output_ts = remainder[:-5] + elif out_name.startswith("output_") and out_name.endswith(".json"): + output_ts = out_name[len("output_"):-5] + + matches: set[Path] = set() + + if run_prefix: + matches.update(p for p in out_dir.glob(f"{run_prefix}_*.json") if p.is_file()) + + if output_ts: + matches.update(p for p in out_dir.glob(f"*_{output_ts}.json") if p.is_file()) + + def _is_main_output_file(p: Path) -> bool: + name = p.name + return ( + ("_output_" in name and name.endswith(".json")) + or (name.startswith("output_") and name.endswith(".json")) + ) + + files: list[Path] = [] + for p in matches: + if p.resolve() == output_path.resolve(): + continue + if _is_main_output_file(p): + continue + files.append(p) + + files.sort(key=lambda x: x.name) + + def _as_return_path(p: Path) -> str: + try: + return str(p.resolve().relative_to(Path.cwd().resolve())) + except Exception: + return str(p.resolve()) + + semantic_dedup_report: Optional[Path] = None + semantic_dedup_final: Optional[Path] = None + for p in files: + name = p.name + if semantic_dedup_report is None and "semantic_dedup_report_" in name: + semantic_dedup_report = p + if semantic_dedup_final is None and "SemanticDedupPostProcessor_" in name: + semantic_dedup_final = p + if semantic_dedup_report is not None and semantic_dedup_final is not None: + break + + return { + "files": [_as_return_path(p) for p in files], + "semantic_dedup": { + "report_file": _as_return_path(semantic_dedup_report) if semantic_dedup_report else None, + "final_output_data_file": _as_return_path(semantic_dedup_final) if semantic_dedup_final else None, + }, + } + @app.post("/api/executions/{execution_id}/cancel") async def cancel_execution(execution_id: str): """ @@ -2048,7 +2133,7 @@ async def get_node_code(workflow_id: str, node_id: str, code_type: str): if node_config: # Check for class path in node metadata (original_config from YAML) original_config = node_config.metadata.get("original_config", {}) if node_config.metadata else {} - + if code_type == 'pre_process': class_path = original_config.get("pre_process") or getattr(node_config, "pre_process", None) elif code_type == 'post_process': @@ -2061,13 +2146,13 @@ async def get_node_code(workflow_id: str, node_id: str, code_type: str): # Extract class name from path (e.g., "tasks.examples.foo.task_executor.MyClass" -> "MyClass") parts = class_path.split(".") target_class_name = parts[-1] if parts else None - + # Determine the file path from the module path # e.g., "tasks.examples.foo.task_executor.MyClass" -> "tasks/examples/foo/task_executor.py" if len(parts) > 1: module_parts = parts[:-1] # Everything except the class name relative_path = "/".join(module_parts) + ".py" - + # Try to find the file relative to the project root # First try: relative to workflow directory's parent (tasks folder level) potential_paths = [ @@ -2076,7 +2161,7 @@ async def get_node_code(workflow_id: str, node_id: str, code_type: str): workflow_dir.parent.parent / relative_path, Path.cwd() / relative_path, ] - + for potential_path in potential_paths: if potential_path.exists(): target_file_path = potential_path @@ -6339,7 +6424,7 @@ def _get_node_code_from_file(content: str, node_id: str, code_type: str, target_ # 2. Otherwise, fall back to node_id-based matching if detected_type == code_type: match_found = False - + # Priority 1: Match by specific class name from YAML config if target_class_name and class_name == target_class_name: match_found = True @@ -6348,7 +6433,7 @@ def _get_node_code_from_file(content: str, node_id: str, code_type: str, target_ normalized_class_id = re.sub(r'[^a-zA-Z0-9_]', '', class_safe_id.replace('-', '_')) if normalized_class_id == safe_node_id or class_name == expected_name: match_found = True - + if match_found: start_line = node.lineno - 1 # 0-indexed end_line = node.end_lineno if hasattr(node, 'end_lineno') else start_line + 1 diff --git a/studio/frontend/src/lib/components/runs/RunDetailsViewEnhanced.svelte b/studio/frontend/src/lib/components/runs/RunDetailsViewEnhanced.svelte index 19db8af..f39f13b 100644 --- a/studio/frontend/src/lib/components/runs/RunDetailsViewEnhanced.svelte +++ b/studio/frontend/src/lib/components/runs/RunDetailsViewEnhanced.svelte @@ -247,6 +247,154 @@ a.click(); URL.revokeObjectURL(url); } + + function getBasename(path: string): string { + const parts = path.split('/'); + return parts[parts.length - 1] || path; + } + + async function downloadJsonData(data: unknown, filename: string) { + const json = JSON.stringify(data, null, 2); + const blob = new Blob([json], { type: 'application/json' }); + const url = URL.createObjectURL(blob); + const a = document.createElement('a'); + a.href = url; + a.download = filename; + a.click(); + URL.revokeObjectURL(url); + } + + async function fetchJsonFromPath(path: string): Promise { + try { + const url = `/api/media/file?path=${encodeURIComponent(path)}&workflow_id=${encodeURIComponent(execution.workflow_id)}`; + const resp = await fetch(url); + if (!resp.ok) return null; + const text = await resp.text(); + return JSON.parse(text); + } catch { + return null; + } + } + + function formatPostProcessorKey(key: string): string { + return key + .split('_') + .filter(Boolean) + .map((part) => part.charAt(0).toUpperCase() + part.slice(1)) + .join(' '); + } + + function formatPostProcessorArtifactLabel(label: string): string { + return label + .replace(/_file$/i, '') + .split('_') + .filter(Boolean) + .map((part) => part.charAt(0).toUpperCase() + part.slice(1)) + .join(' '); + } + type PostProcessorArtifactsIndex = { + files?: string[]; + [key: string]: unknown; + }; + + async function fetchPostProcessorArtifactsIndex(): Promise { + try { + const resp = await fetch(`/api/executions/${encodeURIComponent(execution.id)}/artifacts/post-processors`); + if (!resp.ok) return null; + const payload = await resp.json(); + return payload; + } catch { + return null; + } + } + + let postProcessorArtifactsLoading = $state(false); + let postProcessorArtifactsLoaded = $state(false); + type PostProcessorGroupArtifact = { label: string; path: string; data: unknown }; + type PostProcessorGroup = { key: string; artifacts: PostProcessorGroupArtifact[] }; + let postProcessorGroups = $state([]); + let semanticDedupGroup = $derived(() => postProcessorGroups.find((g) => g.key === 'semantic_dedup') ?? null); + let semanticDedupAvailable = $derived(() => { + const g = semanticDedupGroup(); + return !!g && g.artifacts.length > 0; + }); + let postProcessorGroupsAvailable = $derived(() => postProcessorGroups.some((g) => g.artifacts.length > 0)); + + type PostProcessorArtifact = { path: string; data: unknown }; + let postProcessorArtifacts = $state([]); + let postProcessorArtifactsAvailable = $derived(() => postProcessorArtifacts.length > 0); + + let lastSemanticDedupProbeKey = $state(null); + let semanticDedupProbeKey = $derived(() => { + const runName = metadata?.execution?.run_name ?? ''; + const outFile = execution.output_file ?? ''; + return `${execution.id}:${runName}:${outFile}`; + }); + + $effect(() => { + if (semanticDedupProbeKey() !== lastSemanticDedupProbeKey) { + lastSemanticDedupProbeKey = semanticDedupProbeKey(); + postProcessorArtifactsLoading = false; + postProcessorArtifactsLoaded = false; + postProcessorGroups = []; + postProcessorArtifacts = []; + } + }); + + async function loadPostProcessorArtifacts() { + if (postProcessorArtifactsLoading || postProcessorArtifactsLoaded) return; + postProcessorArtifactsLoading = true; + try { + const idx = await fetchPostProcessorArtifactsIndex(); + const files = Array.isArray(idx?.files) ? (idx?.files as string[]) : []; + const entries = idx ? Object.entries(idx) : []; + const groupEntries = entries.filter(([k, v]) => { + if (k === 'files') return false; + if (!v || typeof v !== 'object') return false; + return Object.values(v as Record).some((val) => typeof val === 'string' && val); + }); + + postProcessorGroups = await Promise.all( + groupEntries.map(async ([key, v]) => { + const obj = v as Record; + const pathEntries = Object.entries(obj).filter(([, value]) => typeof value === 'string' && value); + const artifacts = await Promise.all( + pathEntries.map(async ([label, path]) => { + const data = await fetchJsonFromPath(path as string); + return data === null ? null : ({ label, path: path as string, data } satisfies PostProcessorGroupArtifact); + }) + ); + return { key, artifacts: artifacts.filter((x): x is PostProcessorGroupArtifact => x !== null) }; + }) + ); + + const groupPaths = new Set(); + for (const g of postProcessorGroups) { + for (const a of g.artifacts) groupPaths.add(a.path); + } + + const filteredPaths = files.filter((p) => p && !groupPaths.has(p)); + const uniquePaths = Array.from(new Set(filteredPaths)); + + const results = await Promise.all( + uniquePaths.map(async (path) => { + const data = await fetchJsonFromPath(path); + return data === null ? null : ({ path, data } satisfies PostProcessorArtifact); + }) + ); + + postProcessorArtifacts = results.filter((x): x is PostProcessorArtifact => x !== null); + postProcessorArtifactsLoaded = true; + } finally { + postProcessorArtifactsLoading = false; + } + } + + $effect(() => { + if (activeTab === 'output') { + loadPostProcessorArtifacts(); + } + });
@@ -670,7 +818,200 @@ showViewToggle={true} defaultView="table" /> - {:else} + {/if} + + {#if semanticDedupAvailable()} +
+ + + Semantic Deduplication + + +
+ {#each semanticDedupGroup()!.artifacts as artifact (artifact.path)} +
+
+
+ +
+ +
+ + {#if typeof artifact.data === 'object' && artifact.data !== null && artifact.label === 'report_file' && 'duplicates' in (artifact.data as any) && Array.isArray((artifact.data as any).duplicates)} + + {:else if Array.isArray(artifact.data)} + + {:else if typeof artifact.data === 'object' && artifact.data !== null} + + {:else} +
{JSON.stringify(artifact.data, null, 2)}
+ {/if} +
+ {/each} +
+
+ {/if} + + {#each postProcessorGroups.filter((g) => g.key !== 'semantic_dedup' && g.artifacts.length > 0) as group (group.key)} +
+ + + {formatPostProcessorKey(group.key)} + + +
+ {#each group.artifacts as artifact (artifact.path)} +
+
+
+ +
+ +
+ + {#if Array.isArray(artifact.data)} + + {:else if typeof artifact.data === 'object' && artifact.data !== null} + + {:else} +
{JSON.stringify(artifact.data, null, 2)}
+ {/if} +
+ {/each} +
+
+ {/each} + + {#if postProcessorArtifactsLoading} +
+ +

Loading post-processor artifacts...

+
+ {/if} + + {#if postProcessorArtifactsAvailable()} +
+ + + Post-processor Artifacts + + +
+ {#each postProcessorArtifacts as artifact (artifact.path)} +
+
+
+ +
+ +
+ + {#if Array.isArray(artifact.data)} + + {:else if typeof artifact.data === 'object' && artifact.data !== null} + + {:else} +
{JSON.stringify(artifact.data, null, 2)}
+ {/if} +
+ {/each} +
+
+ {/if} + + {#if !execution.output_data && !postProcessorArtifactsLoading && !postProcessorGroupsAvailable() && !postProcessorArtifactsAvailable()}

No output data available