Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 99 additions & 14 deletions studio/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def _convert_prompts_to_yaml_format(prompts: List[Dict[str, Any]]) -> List[Dict[

Frontend format: [{"role": "system", "content": "..."}, {"role": "user", "content": "..."}]
SyGra YAML format: [{"system": "..."}, {"user": "..."}]

Multi-modal frontend format: [{"role": "user", "content": [{"type": "text", "text": "..."}, {"type": "audio_url", "audio_url": "..."}]}]
Multi-modal YAML format: [{"user": [{"type": "text", "text": "..."}, {"type": "audio_url", "audio_url": "..."}]}]
"""
Expand Down Expand Up @@ -1039,19 +1039,19 @@ async def get_workflow_sample_data(workflow_id: str, limit: int = 3, source_inde
table = source.get("table")
fields = source.get("fields", [])
alias = source.get("alias")

if not table:
return {
"records": [],
"total": 0,
"source_type": "servicenow",
"message": "No table specified for ServiceNow source"
}

try:
from sygra.core.dataset.servicenow_handler import ServiceNowHandler
from sygra.core.dataset.dataset_config import DataSourceConfig as SygraDataSourceConfig

# Build source config for ServiceNowHandler
snow_config = SygraDataSourceConfig(
type="servicenow",
Expand All @@ -1062,10 +1062,10 @@ async def get_workflow_sample_data(workflow_id: str, limit: int = 3, source_inde
order_by=source.get("order_by"),
order_desc=source.get("order_desc", False),
)

handler = ServiceNowHandler(source_config=snow_config)
raw_records = handler.read()

# Flatten records (ServiceNow returns value/display_value dicts)
records = []
for record in raw_records[:limit]:
Expand All @@ -1076,7 +1076,7 @@ async def get_workflow_sample_data(workflow_id: str, limit: int = 3, source_inde
else:
flat_record[key] = value
records.append(flat_record)

total_count = len(raw_records)
return {
"records": records,
Expand All @@ -1086,7 +1086,7 @@ async def get_workflow_sample_data(workflow_id: str, limit: int = 3, source_inde
"alias": alias,
"message": None
}

except Exception as snow_err:
# Fall back to showing configured fields if connection fails
error_msg = str(snow_err)
Expand Down Expand Up @@ -1446,6 +1446,91 @@ async def get_execution(execution_id: str):

raise HTTPException(status_code=404, detail=f"Execution {execution_id} not found")

@app.get("/api/executions/{execution_id}/artifacts/post-processors")
async def get_post_processor_artifacts(execution_id: str):
if execution_id in _executions:
execution = _executions[execution_id]
else:
storage = _get_execution_storage()
execution = storage.get_execution(execution_id)
if not execution:
raise HTTPException(status_code=404, detail=f"Execution {execution_id} not found")

output_file = execution.output_file
if not output_file and isinstance(execution.metadata, dict):
output_file = execution.metadata.get("output_file")

if not output_file:
return {"files": []}

output_path = Path(output_file)
if not output_path.is_absolute():
output_path = (Path.cwd() / output_path).resolve()
else:
output_path = output_path.resolve()

out_dir = output_path.parent
out_name = output_path.name

output_ts = None
run_prefix = None

if "_output_" in out_name and out_name.endswith(".json"):
run_prefix, remainder = out_name.split("_output_", 1)
output_ts = remainder[:-5]
elif out_name.startswith("output_") and out_name.endswith(".json"):
output_ts = out_name[len("output_"):-5]

matches: set[Path] = set()

if run_prefix:
matches.update(p for p in out_dir.glob(f"{run_prefix}_*.json") if p.is_file())

if output_ts:
matches.update(p for p in out_dir.glob(f"*_{output_ts}.json") if p.is_file())

def _is_main_output_file(p: Path) -> bool:
name = p.name
return (
("_output_" in name and name.endswith(".json"))
or (name.startswith("output_") and name.endswith(".json"))
)

files: list[Path] = []
for p in matches:
if p.resolve() == output_path.resolve():
continue
if _is_main_output_file(p):
continue
files.append(p)

files.sort(key=lambda x: x.name)

def _as_return_path(p: Path) -> str:
try:
return str(p.resolve().relative_to(Path.cwd().resolve()))
except Exception:
return str(p.resolve())

semantic_dedup_report: Optional[Path] = None
semantic_dedup_final: Optional[Path] = None
for p in files:
name = p.name
if semantic_dedup_report is None and "semantic_dedup_report_" in name:
semantic_dedup_report = p
if semantic_dedup_final is None and "SemanticDedupPostProcessor_" in name:
semantic_dedup_final = p
if semantic_dedup_report is not None and semantic_dedup_final is not None:
break

return {
"files": [_as_return_path(p) for p in files],
"semantic_dedup": {
"report_file": _as_return_path(semantic_dedup_report) if semantic_dedup_report else None,
"final_output_data_file": _as_return_path(semantic_dedup_final) if semantic_dedup_final else None,
},
}

@app.post("/api/executions/{execution_id}/cancel")
async def cancel_execution(execution_id: str):
"""
Expand Down Expand Up @@ -2048,7 +2133,7 @@ async def get_node_code(workflow_id: str, node_id: str, code_type: str):
if node_config:
# Check for class path in node metadata (original_config from YAML)
original_config = node_config.metadata.get("original_config", {}) if node_config.metadata else {}

if code_type == 'pre_process':
class_path = original_config.get("pre_process") or getattr(node_config, "pre_process", None)
elif code_type == 'post_process':
Expand All @@ -2061,13 +2146,13 @@ async def get_node_code(workflow_id: str, node_id: str, code_type: str):
# Extract class name from path (e.g., "tasks.examples.foo.task_executor.MyClass" -> "MyClass")
parts = class_path.split(".")
target_class_name = parts[-1] if parts else None

# Determine the file path from the module path
# e.g., "tasks.examples.foo.task_executor.MyClass" -> "tasks/examples/foo/task_executor.py"
if len(parts) > 1:
module_parts = parts[:-1] # Everything except the class name
relative_path = "/".join(module_parts) + ".py"

# Try to find the file relative to the project root
# First try: relative to workflow directory's parent (tasks folder level)
potential_paths = [
Expand All @@ -2076,7 +2161,7 @@ async def get_node_code(workflow_id: str, node_id: str, code_type: str):
workflow_dir.parent.parent / relative_path,
Path.cwd() / relative_path,
]

for potential_path in potential_paths:
if potential_path.exists():
target_file_path = potential_path
Expand Down Expand Up @@ -6339,7 +6424,7 @@ def _get_node_code_from_file(content: str, node_id: str, code_type: str, target_
# 2. Otherwise, fall back to node_id-based matching
if detected_type == code_type:
match_found = False

# Priority 1: Match by specific class name from YAML config
if target_class_name and class_name == target_class_name:
match_found = True
Expand All @@ -6348,7 +6433,7 @@ def _get_node_code_from_file(content: str, node_id: str, code_type: str, target_
normalized_class_id = re.sub(r'[^a-zA-Z0-9_]', '', class_safe_id.replace('-', '_'))
if normalized_class_id == safe_node_id or class_name == expected_name:
match_found = True

if match_found:
start_line = node.lineno - 1 # 0-indexed
end_line = node.end_lineno if hasattr(node, 'end_lineno') else start_line + 1
Expand Down
Loading