Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
401 changes: 374 additions & 27 deletions Cargo.lock

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions auto-engine-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@ anyhow = "1.0.100"
convert_case = "0.10.0"
oar-ocr = "0.3.1"
reqwest = { version = "0.12.9", default-features = false, features = ["json", "rustls-tls"] }
rmcp = { version = "0.12.0", features = ["transport-streamable-http-server-session", "transport-streamable-http-server"] }
http = "1.4.0"
tracing = "0.1.41"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
axum = "0.8.8"
toml = "0.9.10"
async-openai = { version = "0.32.2", features = ["responses", "completions", "chat-completion"] }
secrecy = "0.10"
bytes = "1.11.0"

[features]
default = ["types", "context", "event", "pipeline", "runner", "utils"]
Expand Down
59 changes: 43 additions & 16 deletions auto-engine-core/src/context.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
use crate::utils;
use serde::Serialize;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tauri::async_runtime::RwLock;
use tauri::Manager;
use tauri::async_runtime::RwLock;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ValueItem {
pub description: String,
pub value: serde_json::Value,
}

#[derive(Debug)]
pub struct Context {
pub string_value: Arc<RwLock<HashMap<String, serde_json::Value>>>,
pub value: Arc<RwLock<HashMap<String, ValueItem>>>,
pub(crate) screen_scale: f64,
pub(crate) pipeline_path: PathBuf,
pub(crate) workflow_path: PathBuf,
Expand All @@ -20,7 +26,7 @@ impl Context {
#[cfg(feature = "tauri")]
pub fn new(path: PathBuf, app_handle: Option<tauri::AppHandle>) -> Self {
Self {
string_value: Arc::new(RwLock::new(HashMap::new())),
value: Arc::new(RwLock::new(HashMap::new())),
screen_scale: 1.0,
pipeline_path: path.clone(),
workflow_path: path.clone(),
Expand All @@ -31,7 +37,7 @@ impl Context {
#[cfg(not(feature = "tauri"))]
pub fn new(path: PathBuf) -> Self {
Self {
string_value: Arc::new(RwLock::new(HashMap::new())),
value: Arc::new(RwLock::new(HashMap::new())),
screen_scale: 1.0,
pipeline_path: path.clone(),
workflow_path: path.clone(),
Expand All @@ -44,20 +50,32 @@ impl Context {
}

pub async fn set_string_value(&self, key: &str, value: &str) -> Result<(), String> {
self.set_value::<String>(key, value.to_string()).await
self.set_value::<String>(key, value.to_string(), String::new())
.await
}

pub async fn set_value<T: Serialize>(&self, key: &str, value: T) -> Result<(), String> {
let mut map = self.string_value.write().await;
pub async fn set_value<T: Serialize>(
&self,
key: &str,
value: T,
description: String,
) -> Result<(), String> {
let mut map = self.value.write().await;
map.insert(
key.to_string(),
serde_json::to_value(value).map_err(|e| format!("{:?}", e))?,
ValueItem {
description,
value: serde_json::to_value(value).map_err(|e| format!("{:?}", e))?,
},
);
Ok(())
}
pub async fn get_value(&self, key: &str) -> Option<serde_json::Value> {
let map = self.string_value.read().await;
map.get(key).cloned()
let map = self.value.read().await;
if let Some(item) = map.get(key).cloned() {
return Some(item.value);
}
None
}

pub async fn get_value_parse(&self, key: &str) -> Option<serde_json::Value> {
Expand All @@ -80,21 +98,30 @@ impl Context {
default_value
}

pub fn load_image_path(&self, image: &str) -> Result<PathBuf, String> {
pub fn path_image(&self, image: &str) -> Result<PathBuf, String> {
let image_path = self.workflow_path.join("images").join(image);
if !image_path.exists() {
return Err(format!("Image {} does not exist", image));
}
Ok(image_path)
}

pub fn resource_path(&self) -> PathBuf {
if let Some(handle) = self.app_handle.clone(){
pub fn path_resource(&self) -> PathBuf {
if let Some(handle) = self.app_handle.clone() {
if cfg!(debug_assertions) {
return PathBuf::from("")
return PathBuf::from("");
}
return handle.path().resource_dir().unwrap().to_path_buf()
return handle.path().resource_dir().unwrap().to_path_buf();
}
PathBuf::from("")
}

pub async fn values(&self) -> HashMap<String, ValueItem> {
let mut res: HashMap<String, ValueItem> = HashMap::new();
let map = self.value.read().await;
map.iter().for_each(|(k, v)| {
res.insert(k.clone(), v.clone());
});
res
}
}
5 changes: 5 additions & 0 deletions auto-engine-core/src/event/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ impl NodeEventPayload {
pub fn running(name: String) -> NodeEventPayload {
NodeEventPayload::new::<String>("running".to_string(), name, None)
}

pub fn running_with_payload<D: Serialize>(name: String, result: Option<D>) -> NodeEventPayload {
NodeEventPayload::new::<D>("running".to_string(), name, result)
}

pub fn waiting(name: String) -> NodeEventPayload {
NodeEventPayload::new::<String>("waiting".to_string(), name, None)
}
Expand Down
1 change: 1 addition & 0 deletions auto-engine-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ pub mod plugin;

mod action;
pub mod notification;
pub mod mcp;
4 changes: 4 additions & 0 deletions auto-engine-core/src/mcp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pub mod server;
mod state;
mod tool;
mod service;
162 changes: 162 additions & 0 deletions auto-engine-core/src/mcp/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
use crate::mcp::service::McpServiceBuilder;
use crate::mcp::state::McpState;
use crate::mcp::tool::{ToolCallBuilder, ToolDefine};
use crate::node::start::node;
use crate::schema::workflow::WorkflowSchema;
use crate::types::workflow::WorkflowMetaData;
use rmcp::handler::server::wrapper::Parameters;
use schemars::{Schema, SchemaGenerator};
use serde_json::{Map, Value};
use std::collections::HashMap;
use std::fs;
use std::path::PathBuf;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;

struct ParamSchemaBuilder {
params: Vec<String>,
}

impl ParamSchemaBuilder {
fn new() -> ParamSchemaBuilder {
ParamSchemaBuilder { params: vec![] }
}

fn add_param(&mut self, name: String) {
self.params.push(name);
}

fn build(self) -> Schema {
let mut type_string = Map::new();
type_string.insert("type".to_string(), Value::String("string".to_string()));
let mut params = Map::new();
let mut required = vec![];

for param in self.params {
params.insert(param.clone(), Value::Object(type_string.clone()));
required.push(Value::String(param.clone()));
}

let mut res = Map::new();
res.insert("properties".to_string(), Value::Object(params));
res.insert("type".to_string(), Value::String("object".to_string()));
res.insert("required".to_string(), Value::Array(required));

Schema::from(res)
}
}

pub struct McpServer {
workflow_dir: PathBuf,
token: CancellationToken,
}

impl McpServer {
pub fn new(workflow_dir: PathBuf) -> Self {
McpServer {
workflow_dir,
token: Default::default(),
}
}

pub fn load_tools(&self) -> Result<Vec<ToolDefine<McpState>>, String> {
if !self.workflow_dir.exists() {
return Ok(vec![]);
}

let mut workflows = vec![];

for entry in self.workflow_dir.read_dir().map_err(|e| e.to_string())? {
let entry = entry.map_err(|e| e.to_string())?;
let path = entry.path();

if path.is_dir() {
// Try to load metadata from this directory
let meta_path = path.join("_meta.toml");
if !meta_path.exists() {
continue;
}
// read metadata
let meta_content = fs::read_to_string(&meta_path).map_err(|e| e.to_string())?;
let meta = toml::from_str::<WorkflowMetaData>(meta_content.as_str())
.map_err(|e| e.to_string())?;

// read workflow content
let workflow_content =
fs::read_to_string(path.join("workflow.yaml")).map_err(|e| e.to_string())?;

let schema = serde_yaml::from_str::<WorkflowSchema>(&workflow_content)
.map_err(|e| e.to_string())?;

let mut builder = ParamSchemaBuilder::new();
for node in schema.nodes {
if node.action_type == node::START_NODE_TYPE {
if let Some(value) = node.input_data.unwrap_or(HashMap::new()).get("params")
{
value.as_object().map(|obj| {
for (key, value) in obj {
if value.is_string() {
builder.add_param(key.clone())
}
}
});
}
}
}

workflows.push((meta, builder.build()));
}
}

let mut tools = vec![];

for (workflow, param) in workflows {
let tool = ToolCallBuilder::new(
workflow
.name
.ok_or("No name found in metadata".to_string())?,
)
.with_description(
workflow
.description
.ok_or("No description found in metadata".to_string())?,
)
.with_input_schema(param)
.with_call_func(Arc::new(|ctx, params: HashMap<String, Value>| {
Box::pin(async move { Ok(serde_json::Value::String("Hello !".to_string())) })
}))
.build();
tools.push(tool);
}

Ok(tools)
}

pub async fn run(&self) -> Result<(), String> {
let tools = self.load_tools()?;

let service = McpServiceBuilder::new()
.with_port(8080)
.with_tool_calls(tools)
.build();

log::info!("MCP server listening on {}", self.workflow_dir.display());
tracing::info!("MCP server started");
service
.run(self.token.clone())
.await
.map_err(|e| e.to_string())?;

Ok(())
}

pub async fn stop(&mut self) {
self.token.cancel();
self.token = Default::default();
}

pub async fn restart(&mut self) -> Result<(), String> {
self.stop().await;
self.run().await
}
}
Loading
Loading