diff --git a/docs/plugins/core-engines-configuration.md b/docs/plugins/core-engines-configuration.md new file mode 100644 index 0000000000..758efd9de9 --- /dev/null +++ b/docs/plugins/core-engines-configuration.md @@ -0,0 +1,135 @@ +# EventMesh Core Engines Configuration Guide + +EventMesh provides powerful core engines (`Filter`, `Transformer`, `Router`) to dynamically process messages. These engines are configured via **MetaStorage** (Governance Center, e.g., Nacos, Etcd), supporting on-demand loading and hot-reloading. + +## 0. Core Concepts + +Before configuration, it is important to understand the specific role of each engine in the message flow: + +* **Filter (The Gatekeeper)**: Decides **"Whether to pass"**. + * It inspects the message (CloudEvent) attributes. If the message matches the rules, it passes; otherwise, it is dropped. + * *Use Case*: Block debug logs from production traffic; Only subscribe to specific event types. + +* **Transformer (The Translator)**: Decides **"What it looks like"**. + * It modifies the message content (Payload or Metadata) according to templates or scripts. + * *Use Case*: Convert XML to JSON; Mask sensitive data (PII); Adapt legacy protocols to new standards. + +* **Router (The Dispatcher)**: Decides **"Where to go"**. + * It dynamically changes the destination (Topic) of the message. + * *Use Case*: Route traffic to a Canary/Gray release topic; Route high-priority orders to a dedicated queue. + +--- + +## 1. Overview + +The configuration is not in local property files but distributed via the MetaStorage. EventMesh listens to specific **Keys** based on client Groups. + +- **Data Source**: Configured via `eventMesh.metaStorage.plugin.type`. +- **Loading Mechanism**: Lazy loading & Hot-reloading. +- **Key Format**: `{EnginePrefix}-{GroupName}`. +- **Value Format**: JSON Array. + +| Engine | Prefix | Scope | Description | +| :--- | :--- | :--- | :--- | +| **Router** | `router-` | Pub Only | Routes messages to different topics. | +| **Filter** | `filter-` | Pub & Sub | Filters messages based on CloudEvent attributes. | +| **Transformer** | `transformer-` | Pub & Sub | Transforms message content (Payload/Header). | + +--- + +## 2. Router (Routing) + +**Scope**: Publish Only (Upstream) +**Key**: `router-{producerGroup}` + +Decides the target storage topic for a message sent by a producer. + +### Configuration Example (JSON) + +```json +[ + { + "topic": "original-topic", + "routerConfig": { + "targetTopic": "redirect-topic", + "expression": "data.type == 'urgent'" + } + } +] +``` + +* **topic**: The original topic the producer sends to. +* **targetTopic**: The actual topic to write to Storage. +* **expression**: Condition to trigger routing (e.g., SpEL). + +--- + +## 3. Filter (Filtering) + +**Scope**: Both Publish (Upstream) & Subscribe (Downstream) + +### A. Publish Side (Upstream) +**Key**: `filter-{producerGroup}` +**Effect**: Intercepts messages **before** they are sent to Storage. + +### B. Subscribe Side (Downstream) +**Key**: `filter-{consumerGroup}` +**Effect**: Intercepts messages **before** they are pushed to the Consumer. + +### Configuration Example (JSON) + +```json +[ + { + "topic": "test-topic", + "filterPattern": { + "source": ["app-a", "app-b"], + "type": [{"prefix": "com.example"}] + } + } +] +``` + +* **filterPattern**: Rules matching CloudEvent attributes. If a message doesn't match, it is dropped. + +--- + +## 4. Transformer (Transformation) + +**Scope**: Both Publish (Upstream) & Subscribe (Downstream) + +### A. Publish Side (Upstream) +**Key**: `transformer-{producerGroup}` +**Effect**: Modifies message content **before** sending to Storage. + +### B. Subscribe Side (Downstream) +**Key**: `transformer-{consumerGroup}` +**Effect**: Modifies message content **before** pushing to the Consumer. + +### Configuration Example (JSON) + +```json +[ + { + "topic": "raw-topic", + "transformerConfig": { + "transformerType": "template", + "template": "{\"id\": \"${id}\", \"new_content\": \"${data.content}\"}" + } + } +] +``` + +* **transformerType**: e.g., `original`, `template`. +* **template**: The transformation template definition. + +--- + +## 5. Verification + +1. **Publish Config**: Add the JSON config to your Governance Center (e.g., Nacos) with the Data ID `router-MyGroup`. +2. **Send Message**: Use EventMesh SDK to send a message from `MyGroup`. +3. **Observe**: + * For **Router**: Check if the message appears in the `targetTopic` in your MQ. + * For **Filter**: Check if blocked messages are skipped. + * For **Transformer**: Check if the message body in MQ (for Pub) or Consumer (for Sub) is modified. diff --git a/eventmesh-examples/build.gradle b/eventmesh-examples/build.gradle index bd90b83495..5815a1b9b7 100644 --- a/eventmesh-examples/build.gradle +++ b/eventmesh-examples/build.gradle @@ -21,6 +21,7 @@ dependencies { implementation project(":eventmesh-sdks:eventmesh-sdk-java") implementation project(":eventmesh-common") implementation project(":eventmesh-storage-plugin:eventmesh-storage-api") + implementation project(":eventmesh-protocol-plugin:eventmesh-protocol-a2a") implementation project(":eventmesh-connectors:eventmesh-connector-spring") implementation('org.springframework.boot:spring-boot-starter-web') { exclude group: 'org.springframework.boot', module: 'spring-boot-starter-logging' diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/a2a/demo/AgentCardDemo.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/a2a/demo/AgentCardDemo.java new file mode 100644 index 0000000000..51b816814e --- /dev/null +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/a2a/demo/AgentCardDemo.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.a2a.demo; + +import org.apache.eventmesh.a2a.demo.A2AAbstractDemo; +import org.apache.eventmesh.client.http.conf.EventMeshHttpClientConfig; +import org.apache.eventmesh.client.http.producer.EventMeshHttpProducer; +import org.apache.eventmesh.common.Constants; +import org.apache.eventmesh.common.ExampleConstants; +import org.apache.eventmesh.common.utils.JsonUtils; +import org.apache.eventmesh.protocol.a2a.A2AProtocolConstants; +import org.apache.eventmesh.protocol.a2a.AgentIdentity; +import org.apache.eventmesh.protocol.a2a.model.AgentCapabilities; +import org.apache.eventmesh.protocol.a2a.model.AgentCard; +import org.apache.eventmesh.protocol.a2a.model.AgentInterface; +import org.apache.eventmesh.protocol.a2a.model.AgentProvider; +import org.apache.eventmesh.protocol.a2a.model.AgentSkill; + +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; + +import lombok.extern.slf4j.Slf4j; + +/** + * Demo showing A2A Agent Card registration, discovery, and deletion via EventMesh. + */ +@Slf4j +public class AgentCardDemo extends A2AAbstractDemo { + + public static void main(String[] args) throws Exception { + EventMeshHttpClientConfig config = initEventMeshHttpClientConfig("a2a-agent-card-demo"); + try (EventMeshHttpProducer producer = new EventMeshHttpProducer(config)) { + + // 1. Register an Agent Card + registerAgentCard(producer, "my.org", "my.unit", "weather-agent"); + + // 2. List Agent Cards + listAgentCards(producer); + + // 3. Get specific Agent Card + getAgentCard(producer, "my.org", "my.unit", "weather-agent"); + + // 4. Delete Agent Card + deleteAgentCard(producer, "my.org", "my.unit", "weather-agent"); + + log.info("AgentCardDemo completed."); + } + } + + private static void registerAgentCard(EventMeshHttpProducer producer, + String orgId, String unitId, String agentId) throws Exception { + AgentCard card = buildSampleCard(agentId); + + Map params = new HashMap<>(); + params.put("org_id", orgId); + params.put("unit_id", unitId); + params.put("agent_id", agentId); + params.put("card", card); + + Map request = new HashMap<>(); + request.put("jsonrpc", "2.0"); + request.put("method", A2AProtocolConstants.OP_REGISTER_AGENT_CARD); + request.put("params", params); + request.put("id", UUID.randomUUID().toString()); + + CloudEvent event = buildA2ACardEvent(request); + producer.publish(event); + log.info("Registered agent card: {}/{}/{}", orgId, unitId, agentId); + } + + private static void listAgentCards(EventMeshHttpProducer producer) throws Exception { + Map params = new HashMap<>(); + params.put("org_id", "my.org"); + + Map request = new HashMap<>(); + request.put("jsonrpc", "2.0"); + request.put("method", A2AProtocolConstants.OP_LIST_AGENT_CARDS); + request.put("params", params); + request.put("id", UUID.randomUUID().toString()); + + CloudEvent event = buildA2ACardEvent(request); + producer.publish(event); + log.info("Listed agent cards for org: my.org"); + } + + private static void getAgentCard(EventMeshHttpProducer producer, + String orgId, String unitId, String agentId) throws Exception { + Map params = new HashMap<>(); + params.put("org_id", orgId); + params.put("unit_id", unitId); + params.put("agent_id", agentId); + + Map request = new HashMap<>(); + request.put("jsonrpc", "2.0"); + request.put("method", A2AProtocolConstants.OP_GET_AGENT_CARD); + request.put("params", params); + request.put("id", UUID.randomUUID().toString()); + + CloudEvent event = buildA2ACardEvent(request); + producer.publish(event); + log.info("Got agent card: {}/{}/{}", orgId, unitId, agentId); + } + + private static void deleteAgentCard(EventMeshHttpProducer producer, + String orgId, String unitId, String agentId) throws Exception { + Map params = new HashMap<>(); + params.put("org_id", orgId); + params.put("unit_id", unitId); + params.put("agent_id", agentId); + + Map request = new HashMap<>(); + request.put("jsonrpc", "2.0"); + request.put("method", A2AProtocolConstants.OP_DELETE_AGENT_CARD); + request.put("params", params); + request.put("id", UUID.randomUUID().toString()); + + CloudEvent event = buildA2ACardEvent(request); + producer.publish(event); + log.info("Deleted agent card: {}/{}/{}", orgId, unitId, agentId); + } + + private static AgentCard buildSampleCard(String agentId) { + AgentInterface iface = AgentInterface.builder() + .url("http://localhost:8080/a2a") + .protocolBinding("JSONRPC") + .protocolVersion(A2AProtocolConstants.PROTOCOL_VERSION) + .build(); + + AgentProvider provider = AgentProvider.builder() + .url("https://example.org") + .organization("Example Org") + .build(); + + AgentCapabilities capabilities = AgentCapabilities.builder() + .streaming(true) + .pushNotifications(true) + .build(); + + AgentSkill skill = AgentSkill.builder() + .id("weather-query") + .name("Weather Query") + .description("Queries weather information for a given location") + .tags(Arrays.asList("weather", "query")) + .examples(Arrays.asList("What's the weather in Beijing?")) + .build(); + + return AgentCard.builder() + .name(agentId) + .description("A weather query agent") + .version("1.0.0") + .supportedInterfaces(Collections.singletonList(iface)) + .provider(provider) + .capabilities(capabilities) + .skills(Collections.singletonList(skill)) + .defaultInputModes(Collections.singletonList("text/plain")) + .defaultOutputModes(Collections.singletonList("text/plain")) + .build(); + } + + private static CloudEvent buildA2ACardEvent(Map jsonRpcBody) { + String content = JsonUtils.toJSONString(jsonRpcBody); + String method = (String) jsonRpcBody.get("method"); + String ceType = A2AProtocolConstants.CE_TYPE_PREFIX + method.replace("/", ".") + ".req"; + + return CloudEventBuilder.v1() + .withId(UUID.randomUUID().toString()) + .withSource(URI.create("a2a-agent-card-demo")) + .withDataContentType(ExampleConstants.CLOUDEVENT_CONTENT_TYPE) + .withType(ceType) + .withData(content.getBytes(StandardCharsets.UTF_8)) + .withExtension(A2AProtocolConstants.CE_EXTENSION_PROTOCOL, "A2A") + .withExtension(A2AProtocolConstants.CE_EXTENSION_PROTOCOL_VERSION, "2.0") + .withExtension(A2AProtocolConstants.CE_EXTENSION_A2A_METHOD, method) + .withExtension(A2AProtocolConstants.CE_EXTENSION_MCP_TYPE, "request") + .withExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4_000)) + .build(); + } +} diff --git a/eventmesh-function/eventmesh-function-router/src/test/java/org/apache/eventmesh/function/router/RouterBuilderTest.java b/eventmesh-function/eventmesh-function-router/src/test/java/org/apache/eventmesh/function/router/RouterBuilderTest.java new file mode 100644 index 0000000000..30af93f185 --- /dev/null +++ b/eventmesh-function/eventmesh-function-router/src/test/java/org/apache/eventmesh/function/router/RouterBuilderTest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.function.router; + +import org.apache.eventmesh.function.api.Router; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class RouterBuilderTest { + + @Test + public void testBuild() { + String targetTopic = "targetTopic"; + Router router = RouterBuilder.build(targetTopic); + Assertions.assertNotNull(router); + Assertions.assertEquals(targetTopic, router.route("{}")); + } +} diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-a2a/build.gradle b/eventmesh-protocol-plugin/eventmesh-protocol-a2a/build.gradle index 877ec19a7a..440788eeaf 100644 --- a/eventmesh-protocol-plugin/eventmesh-protocol-a2a/build.gradle +++ b/eventmesh-protocol-plugin/eventmesh-protocol-a2a/build.gradle @@ -24,6 +24,7 @@ dependencies { implementation "io.cloudevents:cloudevents-core" implementation "io.cloudevents:cloudevents-json-jackson" implementation "com.fasterxml.jackson.core:jackson-databind" + implementation "com.networknt:json-schema-validator:1.5.6" implementation "org.slf4j:slf4j-api" compileOnly 'org.projectlombok:lombok' diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/java/org/apache/eventmesh/protocol/a2a/A2AProtocolConstants.java b/eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/java/org/apache/eventmesh/protocol/a2a/A2AProtocolConstants.java index 3db2b73545..1c168e85d8 100644 --- a/eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/java/org/apache/eventmesh/protocol/a2a/A2AProtocolConstants.java +++ b/eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/java/org/apache/eventmesh/protocol/a2a/A2AProtocolConstants.java @@ -18,29 +18,62 @@ package org.apache.eventmesh.protocol.a2a; /** - * Standard Operations defined by a2a-protocol.org Specification. + * Standard Operations and Constants defined by A2A Protocol Specification. * Reference: https://a2a-protocol.org/latest/specification/#3-a2a-protocol-operations */ public class A2AProtocolConstants { - + + // Protocol version (per A2A spec) + public static final String PROTOCOL_VERSION = "0.3"; + // Core Messaging public static final String OP_SEND_MESSAGE = "message/send"; public static final String OP_SEND_STREAMING_MESSAGE = "message/sendStream"; - + // Task Management public static final String OP_GET_TASK = "task/get"; public static final String OP_LIST_TASKS = "task/list"; public static final String OP_CANCEL_TASK = "task/cancel"; public static final String OP_SUBSCRIBE_TASK = "task/subscribe"; - + // Notifications public static final String OP_NOTIFICATION_CONFIG_SET = "notification/config/set"; public static final String OP_NOTIFICATION_CONFIG_GET = "notification/config/get"; public static final String OP_NOTIFICATION_CONFIG_LIST = "notification/config/list"; public static final String OP_NOTIFICATION_CONFIG_DELETE = "notification/config/delete"; - - // Discovery + + // Agent Card / Discovery public static final String OP_GET_AGENT_CARD = "agent/card/get"; + public static final String OP_REGISTER_AGENT_CARD = "agent/card/register"; + public static final String OP_DELETE_AGENT_CARD = "agent/card/delete"; + public static final String OP_LIST_AGENT_CARDS = "agent/card/list"; + public static final String OP_UPDATE_AGENT_CARD = "agent/card/update"; + + // Agent Status + public static final String STATUS_ONLINE = "online"; + public static final String STATUS_OFFLINE = "offline"; + + // CloudEvent extension keys (matching EMQX conventions) + public static final String CE_EXTENSION_A2A_STATUS = "a2astatus"; + public static final String CE_EXTENSION_A2A_STATUS_SOURCE = "a2astatussource"; + public static final String CE_EXTENSION_TARGET_AGENT = "targetagent"; + public static final String CE_EXTENSION_A2A_METHOD = "a2amethod"; + public static final String CE_EXTENSION_COLLABORATION_ID = "collaborationid"; + public static final String CE_EXTENSION_MCP_TYPE = "mcptype"; + public static final String CE_EXTENSION_PROTOCOL = "protocol"; + public static final String CE_EXTENSION_PROTOCOL_VERSION = "protocolversion"; + public static final String CE_EXTENSION_SEQ = "seq"; + + // Discovery topic components + public static final String TOPIC_NAMESPACE = "a2a"; + public static final String TOPIC_VERSION = "v1"; + public static final String TOPIC_DISCOVERY = "discovery"; + + // ID validation pattern (matching EMQX: ^[A-Za-z0-9._-]+$) + public static final String SEGMENT_ID_PATTERN = "^[A-Za-z0-9._-]+$"; + + // CloudEvent type prefix + public static final String CE_TYPE_PREFIX = "org.apache.eventmesh.a2a."; /** * Checks if the method is a standard A2A Protocol operation. @@ -54,4 +87,14 @@ public static boolean isStandardOperation(String method) { || method.startsWith("notification/") || method.startsWith("agent/"); } -} \ No newline at end of file + + /** + * Checks if the method is an Agent Card operation. + */ + public static boolean isAgentCardOperation(String method) { + if (method == null) { + return false; + } + return method.startsWith("agent/card"); + } +} diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/java/org/apache/eventmesh/protocol/a2a/AgentCardValidator.java b/eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/java/org/apache/eventmesh/protocol/a2a/AgentCardValidator.java new file mode 100644 index 0000000000..4b8f256bf9 --- /dev/null +++ b/eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/java/org/apache/eventmesh/protocol/a2a/AgentCardValidator.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.protocol.a2a; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.networknt.schema.JsonSchema; +import com.networknt.schema.JsonSchemaFactory; +import com.networknt.schema.SpecVersion; +import com.networknt.schema.ValidationMessage; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Set; + +/** + * Validates Agent Card JSON against the A2A Agent Card JSON Schema. + */ +@Slf4j +public class AgentCardValidator { + + private static final String SCHEMA_RESOURCE = "/a2a/agent_card_schema.json"; + private static final ObjectMapper objectMapper = new ObjectMapper(); + + private final JsonSchema schema; + private final boolean schemaValidationEnabled; + + public AgentCardValidator(boolean schemaValidationEnabled) { + this.schemaValidationEnabled = schemaValidationEnabled; + this.schema = loadSchema(); + } + + public AgentCardValidator() { + this(true); + } + + private JsonSchema loadSchema() { + try (InputStream is = AgentCardValidator.class.getResourceAsStream(SCHEMA_RESOURCE)) { + if (is == null) { + log.warn("Agent card schema resource not found: {}, schema validation will be disabled", SCHEMA_RESOURCE); + return null; + } + JsonNode schemaNode = objectMapper.readTree(is); + JsonSchemaFactory factory = JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V6); + return factory.getSchema(schemaNode); + } catch (IOException e) { + log.warn("Failed to load agent card schema, validation will be disabled: {}", e.getMessage()); + return null; + } + } + + /** + * Validates an Agent Card JSON string. + * + * @param cardJson the card JSON string + * @return ValidationResult with success/failure and error messages + */ + public ValidationResult validate(String cardJson) { + if (cardJson == null || cardJson.isEmpty()) { + return ValidationResult.failure("Card JSON is null or empty"); + } + + if (!schemaValidationEnabled || schema == null) { + // Only check it's valid JSON object + try { + JsonNode node = objectMapper.readTree(cardJson); + if (!node.isObject()) { + return ValidationResult.failure("Card must be a JSON object"); + } + return ValidationResult.success(); + } catch (Exception e) { + return ValidationResult.failure("Card is not valid JSON: " + e.getMessage()); + } + } + + try { + JsonNode cardNode = objectMapper.readTree(cardJson); + if (!cardNode.isObject()) { + return ValidationResult.failure("Card must be a JSON object"); + } + Set messages = schema.validate(cardNode); + if (messages.isEmpty()) { + return ValidationResult.success(); + } + StringBuilder sb = new StringBuilder("Card schema validation failed: "); + for (ValidationMessage msg : messages) { + sb.append(msg.getMessage()).append("; "); + } + return ValidationResult.failure(sb.toString()); + } catch (IOException e) { + return ValidationResult.failure("Failed to parse card JSON: " + e.getMessage()); + } + } + + /** + * Validates that an ID segment matches the allowed pattern. + */ + public static boolean validateId(String id) { + return id != null && id.matches(A2AProtocolConstants.SEGMENT_ID_PATTERN); + } + + public static class ValidationResult { + + private final boolean valid; + private final String errorMessage; + + private ValidationResult(boolean valid, String errorMessage) { + this.valid = valid; + this.errorMessage = errorMessage; + } + + public static ValidationResult success() { + return new ValidationResult(true, null); + } + + public static ValidationResult failure(String errorMessage) { + return new ValidationResult(false, errorMessage); + } + + public boolean isValid() { + return valid; + } + + public String getErrorMessage() { + return errorMessage; + } + } +} diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/java/org/apache/eventmesh/protocol/a2a/AgentIdentity.java b/eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/java/org/apache/eventmesh/protocol/a2a/AgentIdentity.java new file mode 100644 index 0000000000..9677e6d5e8 --- /dev/null +++ b/eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/java/org/apache/eventmesh/protocol/a2a/AgentIdentity.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.protocol.a2a; + +import java.io.Serializable; +import java.util.Objects; + +import com.fasterxml.jackson.annotation.JsonInclude; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Represents the hierarchical identity of an A2A agent: org_id / unit_id / agent_id. + * Also provides discovery topic construction and parsing per the A2A protocol. + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@JsonInclude(JsonInclude.Include.NON_NULL) +public class AgentIdentity implements Serializable { + + private static final long serialVersionUID = 1L; + + public static final String GLOBAL_NAMESPACE = "global"; + public static final String TOPIC_NAMESPACE = "a2a"; + public static final String TOPIC_VERSION = "v1"; + public static final String TOPIC_DISCOVERY = "discovery"; + + private String orgId; + + private String unitId; + + private String agentId; + + private String namespace; + + public AgentIdentity(String orgId, String unitId, String agentId) { + this.orgId = orgId; + this.unitId = unitId; + this.agentId = agentId; + this.namespace = GLOBAL_NAMESPACE; + } + + /** + * Builds the discovery topic for this agent identity. + * Global namespace: a2a/v1/discovery/{orgId}/{unitId}/{agentId} + * Custom namespace: {namespace}/a2a/v1/discovery/{orgId}/{unitId}/{agentId} + */ + public String discoveryTopic() { + if (GLOBAL_NAMESPACE.equals(namespace)) { + return String.join("/", TOPIC_NAMESPACE, TOPIC_VERSION, TOPIC_DISCOVERY, orgId, unitId, agentId); + } + return String.join("/", namespace, TOPIC_NAMESPACE, TOPIC_VERSION, TOPIC_DISCOVERY, orgId, unitId, agentId); + } + + /** + * Parses a discovery topic string into an AgentIdentity. + * + * @param topic the discovery topic + * @return parsed AgentIdentity, or null if the topic does not match the expected pattern + */ + public static AgentIdentity fromDiscoveryTopic(String topic) { + if (topic == null) { + return null; + } + String[] parts = topic.split("/"); + // Global: a2a/v1/discovery/{org}/{unit}/{agent} = 6 parts + // Namespaced: {ns}/a2a/v1/discovery/{org}/{unit}/{agent} = 7 parts + if (parts.length == 6 + && TOPIC_NAMESPACE.equals(parts[0]) + && TOPIC_VERSION.equals(parts[1]) + && TOPIC_DISCOVERY.equals(parts[2])) { + return AgentIdentity.builder() + .namespace(GLOBAL_NAMESPACE) + .orgId(parts[3]) + .unitId(parts[4]) + .agentId(parts[5]) + .build(); + } + if (parts.length == 7 + && TOPIC_NAMESPACE.equals(parts[1]) + && TOPIC_VERSION.equals(parts[2]) + && TOPIC_DISCOVERY.equals(parts[3])) { + return AgentIdentity.builder() + .namespace(parts[0]) + .orgId(parts[4]) + .unitId(parts[5]) + .agentId(parts[6]) + .build(); + } + return null; + } + + /** + * Returns the composite client ID: orgId/unitId/agentId (matching EMQX's agent_card_clientid). + */ + public String clientId() { + return String.join("/", orgId, unitId, agentId); + } + + /** + * Validates that all ID segments match the allowed pattern: ^[A-Za-z0-9._-]+$ + */ + public boolean isValid() { + return isValidId(orgId) && isValidId(unitId) && isValidId(agentId); + } + + private static boolean isValidId(String id) { + return id != null && id.matches(A2AProtocolConstants.SEGMENT_ID_PATTERN); + } + + /** + * Checks if this identity matches a wildcard filter. + * Wildcard is represented by "+" (matching EMQX's MQTT wildcard convention). + */ + public boolean matchesFilter(String filterOrgId, String filterUnitId, String filterAgentId) { + return (filterOrgId == null || "+".equals(filterOrgId) || filterOrgId.equals(orgId)) + && (filterUnitId == null || "+".equals(filterUnitId) || filterUnitId.equals(unitId)) + && (filterAgentId == null || "+".equals(filterAgentId) || filterAgentId.equals(agentId)); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + AgentIdentity that = (AgentIdentity) o; + return Objects.equals(orgId, that.orgId) + && Objects.equals(unitId, that.unitId) + && Objects.equals(agentId, that.agentId) + && Objects.equals(namespace, that.namespace); + } + + @Override + public int hashCode() { + return Objects.hash(orgId, unitId, agentId, namespace); + } +} diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/java/org/apache/eventmesh/protocol/a2a/EnhancedA2AProtocolAdaptor.java b/eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/java/org/apache/eventmesh/protocol/a2a/EnhancedA2AProtocolAdaptor.java index 336dc4fd2a..fa906f32c2 100644 --- a/eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/java/org/apache/eventmesh/protocol/a2a/EnhancedA2AProtocolAdaptor.java +++ b/eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/java/org/apache/eventmesh/protocol/a2a/EnhancedA2AProtocolAdaptor.java @@ -38,11 +38,13 @@ import lombok.extern.slf4j.Slf4j; /** - * Enhanced A2A Protocol Adaptor that implements MCP (Model Context Protocol) over CloudEvents. + * Enhanced A2A Protocol Adaptor that implements A2A (Agent-to-Agent) protocol over CloudEvents. * *

This adaptor supports: - * 1. Standard MCP JSON-RPC 2.0 messages. - * 2. Delegation to standard CloudEvents/HTTP protocols. + * 1. Standard A2A JSON-RPC 2.0 messages (messaging, task, notification, agent card operations). + * 2. Agent Card registration and discovery via discovery topics. + * 3. Agent status metadata augmentation. + * 4. Delegation to standard CloudEvents/HTTP protocols. */ @Slf4j public class EnhancedA2AProtocolAdaptor implements ProtocolAdaptor { @@ -52,14 +54,14 @@ public class EnhancedA2AProtocolAdaptor implements ProtocolAdaptor cloudEventsAdaptor; private ProtocolAdaptor httpAdaptor; private volatile boolean initialized = false; + private AgentCardValidator cardValidator; + public EnhancedA2AProtocolAdaptor() { - // Leverage existing protocol infrastructure with null checks try { this.cloudEventsAdaptor = ProtocolPluginFactory.getProtocolAdaptor("cloudevents"); } catch (Exception e) { @@ -78,7 +80,8 @@ public EnhancedA2AProtocolAdaptor() { @Override public void initialize() { if (!initialized) { - log.info("Initializing Enhanced A2A Protocol Adaptor v{} (MCP Support)", PROTOCOL_VERSION); + log.info("Initializing Enhanced A2A Protocol Adaptor v{} (Agent Card Registry Support)", PROTOCOL_VERSION); + this.cardValidator = new AgentCardValidator(true); if (cloudEventsAdaptor != null) { log.info("Leveraging CloudEvents adaptor: {}", cloudEventsAdaptor.getClass().getSimpleName()); } @@ -107,24 +110,23 @@ public CloudEvent toCloudEvent(ProtocolTransportObject protocol) throws Protocol // ignore } - // 1. Check for MCP / JSON-RPC 2.0 if (node != null && node.has("jsonrpc") && "2.0".equals(node.get("jsonrpc").asText())) { - return convertMcpToCloudEvent(node, content); + return convertA2AToCloudEvent(node, content); } - // 2. Delegation if (protocol.getClass().getName().contains("Http") && httpAdaptor != null) { return httpAdaptor.toCloudEvent(protocol); } else if (cloudEventsAdaptor != null) { return cloudEventsAdaptor.toCloudEvent(protocol); } else { - // Last resort: if it looks like JSON but missing headers, treat as MCP Request implicitly if it has 'method' if (node != null && node.has("method")) { - return convertMcpToCloudEvent(node, content); + return convertA2AToCloudEvent(node, content); } throw new ProtocolHandleException("Unknown protocol message format"); } + } catch (ProtocolHandleException e) { + throw e; } catch (Exception e) { throw new ProtocolHandleException("Failed to convert to CloudEvent", e); } @@ -143,12 +145,11 @@ public List toBatchCloudEvent(ProtocolTransportObject protocol) thro // ignore } - // Check if this is a Batch (JSON Array) if (node != null && node.isArray()) { List events = new ArrayList<>(); for (JsonNode item : node) { if (item.has("jsonrpc")) { - events.add(convertMcpToCloudEvent(item, item.toString())); + events.add(convertA2AToCloudEvent(item, item.toString())); } } if (!events.isEmpty()) { @@ -156,7 +157,6 @@ public List toBatchCloudEvent(ProtocolTransportObject protocol) thro } } - // Delegate if (cloudEventsAdaptor != null) { try { return cloudEventsAdaptor.toBatchCloudEvent(protocol); @@ -167,10 +167,11 @@ public List toBatchCloudEvent(ProtocolTransportObject protocol) thro } } - // Fallback CloudEvent single = toCloudEvent(protocol); return Collections.singletonList(single); + } catch (ProtocolHandleException e) { + throw e; } catch (Exception e) { throw new ProtocolHandleException("Failed to convert batch to CloudEvents", e); } @@ -179,12 +180,10 @@ public List toBatchCloudEvent(ProtocolTransportObject protocol) thro @Override public ProtocolTransportObject fromCloudEvent(CloudEvent cloudEvent) throws ProtocolHandleException { try { - // Check if this is an A2A/MCP CloudEvent if (isA2ACloudEvent(cloudEvent)) { return convertCloudEventToA2A(cloudEvent); } - // Determine target protocol from CloudEvent extensions String targetProtocol = getTargetProtocol(cloudEvent); switch (targetProtocol.toLowerCase()) { @@ -234,7 +233,10 @@ public Set getCapabilities() { "mcp-jsonrpc", "agent-communication", "workflow-orchestration", - "collaboration" + "collaboration", + "agent-discovery", + "agent-card-registry", + "agent-status" ); } @@ -246,13 +248,11 @@ public boolean isValid(ProtocolTransportObject protocol) { try { String content = protocol.toString(); - // Fast fail if (!content.contains("{")) { return false; } JsonNode node = objectMapper.readTree(content); - // Valid if JSON-RPC if (node.has("jsonrpc")) { return true; } @@ -271,108 +271,144 @@ public boolean isValid(ProtocolTransportObject protocol) { } private boolean isA2ACloudEvent(CloudEvent cloudEvent) { - return PROTOCOL_TYPE.equals(cloudEvent.getExtension("protocol")) - || cloudEvent.getType().startsWith("org.apache.eventmesh.a2a") - || cloudEvent.getExtension("a2amethod") != null; + return PROTOCOL_TYPE.equals(cloudEvent.getExtension(A2AProtocolConstants.CE_EXTENSION_PROTOCOL)) + || cloudEvent.getType().startsWith(A2AProtocolConstants.CE_TYPE_PREFIX) + || cloudEvent.getExtension(A2AProtocolConstants.CE_EXTENSION_A2A_METHOD) != null; } /** - * Converts a modern MCP / A2A JSON-RPC message to CloudEvent. - * Distinguishes between Requests and Responses for Event-Driven Async RPC pattern. + * Converts an A2A JSON-RPC message to CloudEvent. + * Handles Agent Card operations with discovery topic routing. */ - private CloudEvent convertMcpToCloudEvent(JsonNode node, String content) throws ProtocolHandleException { + private CloudEvent convertA2AToCloudEvent(JsonNode node, String content) throws ProtocolHandleException { try { boolean isRequest = node.has("method"); - boolean isResponse = node.has("result") - || node.has("error"); + boolean isResponse = node.has("result") || node.has("error"); String id = node.has("id") ? node.get("id").asText() : generateMessageId(); String ceType; String mcpType; String correlationId = null; - String eventId = isRequest ? id : generateMessageId(); // For request, CE id = RPC id. For response, CE id is new. + String eventId = isRequest ? id : generateMessageId(); CloudEventBuilder builder = CloudEventBuilder.v1() .withSource(java.net.URI.create("eventmesh-a2a")) .withData(content.getBytes(StandardCharsets.UTF_8)) - .withExtension("protocol", PROTOCOL_TYPE) - .withExtension("protocolversion", PROTOCOL_VERSION); + .withExtension(A2AProtocolConstants.CE_EXTENSION_PROTOCOL, PROTOCOL_TYPE) + .withExtension(A2AProtocolConstants.CE_EXTENSION_PROTOCOL_VERSION, PROTOCOL_VERSION); if (isRequest) { - // JSON-RPC Request -> Event String method = node.get("method").asText(); - - // Determine suffix based on operation type - String suffix = ".req"; - if (A2AProtocolConstants.OP_SEND_STREAMING_MESSAGE.equals(method)) { - suffix = ".stream"; - } - - ceType = "org.apache.eventmesh.a2a." + method.replace("/", ".") + suffix; mcpType = "request"; - builder.withExtension("a2amethod", method); - - // Extract optional params for routing - if (node.has("params")) { - JsonNode params = node.get("params"); - - // 1. Pub/Sub Routing (Priority): Broadcast to a Topic - if (params.has("_topic")) { - builder.withSubject(params.get("_topic").asText()); - } else if (params.has("_agentId")) { - // 2. P2P Routing (Fallback): Unicast to specific Agent - builder.withExtension("targetagent", params.get("_agentId").asText()); - } - - // 3. Sequencing for Streaming - if (params.has("_seq")) { - builder.withExtension("seq", params.get("_seq").asText()); - } + if (A2AProtocolConstants.isAgentCardOperation(method)) { + ceType = buildAgentCardCloudEventType(method); + builder.withExtension(A2AProtocolConstants.CE_EXTENSION_A2A_METHOD, method); + extractAgentCardRouting(node, builder, method); + } else if (A2AProtocolConstants.OP_SEND_STREAMING_MESSAGE.equals(method)) { + ceType = A2AProtocolConstants.CE_TYPE_PREFIX + method.replace("/", ".") + ".stream"; + builder.withExtension(A2AProtocolConstants.CE_EXTENSION_A2A_METHOD, method); + extractStandardRouting(node, builder); + } else { + ceType = A2AProtocolConstants.CE_TYPE_PREFIX + method.replace("/", ".") + ".req"; + builder.withExtension(A2AProtocolConstants.CE_EXTENSION_A2A_METHOD, method); + extractStandardRouting(node, builder); } } else if (isResponse) { - // JSON-RPC Response -> Event - // We map the RPC ID to correlationId so the requester can match it - ceType = "org.apache.eventmesh.a2a.common.response"; + ceType = A2AProtocolConstants.CE_TYPE_PREFIX + "common.response"; mcpType = "response"; correlationId = id; - - builder.withExtension("collaborationid", correlationId); + builder.withExtension(A2AProtocolConstants.CE_EXTENSION_COLLABORATION_ID, correlationId); } else { - // Notification or invalid - ceType = "org.apache.eventmesh.a2a.unknown"; + ceType = A2AProtocolConstants.CE_TYPE_PREFIX + "unknown"; mcpType = "unknown"; } builder.withId(eventId) .withType(ceType) - .withExtension("mcptype", mcpType); + .withExtension(A2AProtocolConstants.CE_EXTENSION_MCP_TYPE, mcpType); return builder.build(); } catch (Exception e) { - throw new ProtocolHandleException("Failed to convert MCP/A2A message to CloudEvent", e); + throw new ProtocolHandleException("Failed to convert A2A message to CloudEvent", e); } } - private ProtocolTransportObject convertCloudEventToA2A(CloudEvent cloudEvent) - throws ProtocolHandleException { - try { - if (cloudEventsAdaptor != null) { - try { - return cloudEventsAdaptor.fromCloudEvent(cloudEvent); - } catch (Exception ignored) { - // ignore + private String buildAgentCardCloudEventType(String method) { + return A2AProtocolConstants.CE_TYPE_PREFIX + method.replace("/", ".") + ".req"; + } + + /** + * Extracts routing for Agent Card operations. + * For register/update: route to discovery topic with org_id/unit_id/agent_id from params. + * For get/delete: route to discovery topic for lookup. + * For list: route to discovery topic with wildcard. + */ + private void extractAgentCardRouting(JsonNode node, CloudEventBuilder builder, String method) { + if (!node.has("params")) { + return; + } + JsonNode params = node.get("params"); + + String orgId = getTextParam(params, "org_id"); + String unitId = getTextParam(params, "unit_id"); + String agentId = getTextParam(params, "agent_id"); + + if (orgId != null && unitId != null && agentId != null) { + AgentIdentity identity = new AgentIdentity(orgId, unitId, agentId); + builder.withSubject(identity.discoveryTopic()); + } + + if (params.has("card")) { + // Validate card if validator is available + if (cardValidator != null) { + String cardJson = params.get("card").toString(); + AgentCardValidator.ValidationResult result = cardValidator.validate(cardJson); + if (!result.isValid()) { + log.warn("Agent card validation failed: {}", result.getErrorMessage()); } } + } + } + + private void extractStandardRouting(JsonNode node, CloudEventBuilder builder) { + if (!node.has("params")) { + return; + } + JsonNode params = node.get("params"); - byte[] data = cloudEvent.getData() != null ? cloudEvent.getData().toBytes() : new byte[0]; - String content = new String(data, StandardCharsets.UTF_8); - return new SimpleA2AProtocolTransportObject(content, cloudEvent); + if (params.has("_topic")) { + builder.withSubject(params.get("_topic").asText()); + } else if (params.has("_agentId")) { + builder.withExtension(A2AProtocolConstants.CE_EXTENSION_TARGET_AGENT, params.get("_agentId").asText()); + } - } catch (Exception e) { - throw new ProtocolHandleException("Failed to convert CloudEvent to A2A", e); + if (params.has("_seq")) { + builder.withExtension(A2AProtocolConstants.CE_EXTENSION_SEQ, params.get("_seq").asText()); + } + } + + private String getTextParam(JsonNode params, String key) { + if (params.has(key)) { + JsonNode val = params.get(key); + return val.isTextual() ? val.asText() : null; + } + return null; + } + + private ProtocolTransportObject convertCloudEventToA2A(CloudEvent cloudEvent) { + if (cloudEventsAdaptor != null) { + try { + return cloudEventsAdaptor.fromCloudEvent(cloudEvent); + } catch (Exception ignored) { + // ignore + } } + + byte[] data = cloudEvent.getData() != null ? cloudEvent.getData().toBytes() : new byte[0]; + String content = new String(data, StandardCharsets.UTF_8); + return new SimpleA2AProtocolTransportObject(content, cloudEvent); } private String getTargetProtocol(CloudEvent cloudEvent) { @@ -391,6 +427,7 @@ private String getTargetProtocol(CloudEvent cloudEvent) { } private static class SimpleA2AProtocolTransportObject implements ProtocolTransportObject { + private final String content; private final CloudEvent sourceCloudEvent; @@ -416,6 +453,6 @@ private Set createCapabilitiesSet(String... capabilities) { } private String generateMessageId() { - return "a2a-mcp-" + System.currentTimeMillis() + "-" + Math.random(); + return "a2a-" + System.currentTimeMillis() + "-" + Math.random(); } } diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/java/org/apache/eventmesh/protocol/a2a/model/AgentCapabilities.java b/eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/java/org/apache/eventmesh/protocol/a2a/model/AgentCapabilities.java new file mode 100644 index 0000000000..44322146ba --- /dev/null +++ b/eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/java/org/apache/eventmesh/protocol/a2a/model/AgentCapabilities.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.protocol.a2a.model; + +import java.io.Serializable; +import java.util.List; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Defines optional capabilities supported by an agent. + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@JsonInclude(JsonInclude.Include.NON_NULL) +public class AgentCapabilities implements Serializable { + + private static final long serialVersionUID = 1L; + + private Boolean streaming; + + @JsonProperty("pushNotifications") + private Boolean pushNotifications; + + @JsonProperty("extendedAgentCard") + private Boolean extendedAgentCard; + + private List extensions; +} diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/java/org/apache/eventmesh/protocol/a2a/model/AgentCard.java b/eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/java/org/apache/eventmesh/protocol/a2a/model/AgentCard.java new file mode 100644 index 0000000000..6f1787874c --- /dev/null +++ b/eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/java/org/apache/eventmesh/protocol/a2a/model/AgentCard.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.protocol.a2a.model; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Represents an A2A Agent Card as defined by the A2A protocol specification. + * Reference: https://a2a-protocol.org/latest/specification/#agent-card + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@JsonInclude(JsonInclude.Include.NON_NULL) +public class AgentCard implements Serializable { + + private static final long serialVersionUID = 1L; + + private String name; + + private String description; + + private String version; + + @JsonProperty("supportedInterfaces") + private List supportedInterfaces; + + private AgentProvider provider; + + private AgentCapabilities capabilities; + + private List skills; + + private List signatures; + + @JsonProperty("securitySchemes") + private Map securitySchemes; + + @JsonProperty("securityRequirements") + private List securityRequirements; + + @JsonProperty("defaultInputModes") + private List defaultInputModes; + + @JsonProperty("defaultOutputModes") + private List defaultOutputModes; + + @JsonProperty("documentationUrl") + private String documentationUrl; + + @JsonProperty("iconUrl") + private String iconUrl; +} diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/java/org/apache/eventmesh/protocol/a2a/model/AgentCardSignature.java b/eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/java/org/apache/eventmesh/protocol/a2a/model/AgentCardSignature.java new file mode 100644 index 0000000000..13cacbd4fc --- /dev/null +++ b/eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/java/org/apache/eventmesh/protocol/a2a/model/AgentCardSignature.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.protocol.a2a.model; + +import java.io.Serializable; +import java.util.Map; + +import com.fasterxml.jackson.annotation.JsonInclude; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Represents a JWS signature of an AgentCard (RFC 7515). + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@JsonInclude(JsonInclude.Include.NON_NULL) +public class AgentCardSignature implements Serializable { + + private static final long serialVersionUID = 1L; + + private String protect; + + private String signature; + + private Map header; +} diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/java/org/apache/eventmesh/protocol/a2a/model/AgentExtension.java b/eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/java/org/apache/eventmesh/protocol/a2a/model/AgentExtension.java new file mode 100644 index 0000000000..ccb19a9c5e --- /dev/null +++ b/eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/java/org/apache/eventmesh/protocol/a2a/model/AgentExtension.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.protocol.a2a.model; + +import java.io.Serializable; +import java.util.Map; + +import com.fasterxml.jackson.annotation.JsonInclude; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * A declaration of a protocol extension supported by an Agent. + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@JsonInclude(JsonInclude.Include.NON_NULL) +public class AgentExtension implements Serializable { + + private static final long serialVersionUID = 1L; + + private String uri; + + private String description; + + private Boolean required; + + private Map params; +} diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/java/org/apache/eventmesh/protocol/a2a/model/AgentInterface.java b/eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/java/org/apache/eventmesh/protocol/a2a/model/AgentInterface.java new file mode 100644 index 0000000000..d0f398951f --- /dev/null +++ b/eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/java/org/apache/eventmesh/protocol/a2a/model/AgentInterface.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.protocol.a2a.model; + +import java.io.Serializable; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Declares a combination of a target URL, transport and protocol version for interacting with the agent. + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@JsonInclude(JsonInclude.Include.NON_NULL) +public class AgentInterface implements Serializable { + + private static final long serialVersionUID = 1L; + + private String url; + + @JsonProperty("protocolBinding") + private String protocolBinding; + + @JsonProperty("protocolVersion") + private String protocolVersion; + + private String tenant; +} diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/java/org/apache/eventmesh/protocol/a2a/model/AgentProvider.java b/eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/java/org/apache/eventmesh/protocol/a2a/model/AgentProvider.java new file mode 100644 index 0000000000..72e53b39ef --- /dev/null +++ b/eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/java/org/apache/eventmesh/protocol/a2a/model/AgentProvider.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.protocol.a2a.model; + +import java.io.Serializable; + +import com.fasterxml.jackson.annotation.JsonInclude; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Represents the service provider of an agent. + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@JsonInclude(JsonInclude.Include.NON_NULL) +public class AgentProvider implements Serializable { + + private static final long serialVersionUID = 1L; + + private String url; + + private String organization; +} diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/java/org/apache/eventmesh/protocol/a2a/model/AgentSkill.java b/eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/java/org/apache/eventmesh/protocol/a2a/model/AgentSkill.java new file mode 100644 index 0000000000..0e484af21f --- /dev/null +++ b/eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/java/org/apache/eventmesh/protocol/a2a/model/AgentSkill.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.protocol.a2a.model; + +import java.io.Serializable; +import java.util.List; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Represents a distinct capability or function that an agent can perform. + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@JsonInclude(JsonInclude.Include.NON_NULL) +public class AgentSkill implements Serializable { + + private static final long serialVersionUID = 1L; + + private String id; + + private String name; + + private String description; + + private List tags; + + private List examples; + + @JsonProperty("inputModes") + private List inputModes; + + @JsonProperty("outputModes") + private List outputModes; + + @JsonProperty("securityRequirements") + private List securityRequirements; +} diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/java/org/apache/eventmesh/protocol/a2a/model/SecurityRequirement.java b/eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/java/org/apache/eventmesh/protocol/a2a/model/SecurityRequirement.java new file mode 100644 index 0000000000..cb8b05381f --- /dev/null +++ b/eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/java/org/apache/eventmesh/protocol/a2a/model/SecurityRequirement.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.protocol.a2a.model; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +import com.fasterxml.jackson.annotation.JsonInclude; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Defines the security requirements for an agent. + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@JsonInclude(JsonInclude.Include.NON_NULL) +public class SecurityRequirement implements Serializable { + + private static final long serialVersionUID = 1L; + + private Map schemes; + + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + @JsonInclude(JsonInclude.Include.NON_NULL) + public static class StringList implements Serializable { + + private static final long serialVersionUID = 1L; + + private List list; + } +} diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/java/org/apache/eventmesh/protocol/a2a/model/SecurityScheme.java b/eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/java/org/apache/eventmesh/protocol/a2a/model/SecurityScheme.java new file mode 100644 index 0000000000..381594232d --- /dev/null +++ b/eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/java/org/apache/eventmesh/protocol/a2a/model/SecurityScheme.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.protocol.a2a.model; + +import java.io.Serializable; +import java.util.Map; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Defines a security scheme that can be used to secure an agent's endpoints. + * Discriminated union type based on OpenAPI 3.2 Security Scheme Object. + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@JsonInclude(JsonInclude.Include.NON_NULL) +public class SecurityScheme implements Serializable { + + private static final long serialVersionUID = 1L; + + @JsonProperty("apiKeySecurityScheme") + private APIKeySecurityScheme apiKeySecurityScheme; + + @JsonProperty("httpAuthSecurityScheme") + private HTTPAuthSecurityScheme httpAuthSecurityScheme; + + @JsonProperty("oauth2SecurityScheme") + private OAuth2SecurityScheme oauth2SecurityScheme; + + @JsonProperty("openIdConnectSecurityScheme") + private OpenIdConnectSecurityScheme openIdConnectSecurityScheme; + + @JsonProperty("mtlsSecurityScheme") + private MutualTlsSecurityScheme mtlsSecurityScheme; + + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + @JsonInclude(JsonInclude.Include.NON_NULL) + public static class APIKeySecurityScheme implements Serializable { + + private static final long serialVersionUID = 1L; + + private String description; + private String location; + private String name; + } + + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + @JsonInclude(JsonInclude.Include.NON_NULL) + public static class HTTPAuthSecurityScheme implements Serializable { + + private static final long serialVersionUID = 1L; + + private String description; + private String scheme; + + @JsonProperty("bearerFormat") + private String bearerFormat; + } + + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + @JsonInclude(JsonInclude.Include.NON_NULL) + public static class OAuth2SecurityScheme implements Serializable { + + private static final long serialVersionUID = 1L; + + private String description; + private OAuthFlows flows; + + @JsonProperty("oauth2MetadataUrl") + private String oauth2MetadataUrl; + } + + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + @JsonInclude(JsonInclude.Include.NON_NULL) + public static class OpenIdConnectSecurityScheme implements Serializable { + + private static final long serialVersionUID = 1L; + + private String description; + + @JsonProperty("openIdConnectUrl") + private String openIdConnectUrl; + } + + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + @JsonInclude(JsonInclude.Include.NON_NULL) + public static class MutualTlsSecurityScheme implements Serializable { + + private static final long serialVersionUID = 1L; + + private String description; + } + + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + @JsonInclude(JsonInclude.Include.NON_NULL) + public static class OAuthFlows implements Serializable { + + private static final long serialVersionUID = 1L; + + @JsonProperty("authorizationCode") + private AuthorizationCodeOAuthFlow authorizationCode; + + @JsonProperty("clientCredentials") + private ClientCredentialsOAuthFlow clientCredentials; + + @JsonProperty("deviceCode") + private DeviceCodeOAuthFlow deviceCode; + + @JsonProperty("implicit") + private ImplicitOAuthFlow implicit; + + @JsonProperty("password") + private PasswordOAuthFlow password; + } + + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + @JsonInclude(JsonInclude.Include.NON_NULL) + public static class AuthorizationCodeOAuthFlow implements Serializable { + + private static final long serialVersionUID = 1L; + + @JsonProperty("authorizationUrl") + private String authorizationUrl; + + @JsonProperty("tokenUrl") + private String tokenUrl; + + @JsonProperty("refreshUrl") + private String refreshUrl; + + private Map scopes; + + @JsonProperty("pkceRequired") + private Boolean pkceRequired; + } + + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + @JsonInclude(JsonInclude.Include.NON_NULL) + public static class ClientCredentialsOAuthFlow implements Serializable { + + private static final long serialVersionUID = 1L; + + @JsonProperty("tokenUrl") + private String tokenUrl; + + @JsonProperty("refreshUrl") + private String refreshUrl; + + private Map scopes; + } + + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + @JsonInclude(JsonInclude.Include.NON_NULL) + public static class DeviceCodeOAuthFlow implements Serializable { + + private static final long serialVersionUID = 1L; + + @JsonProperty("deviceAuthorizationUrl") + private String deviceAuthorizationUrl; + + @JsonProperty("tokenUrl") + private String tokenUrl; + + @JsonProperty("refreshUrl") + private String refreshUrl; + + private Map scopes; + } + + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + @JsonInclude(JsonInclude.Include.NON_NULL) + public static class ImplicitOAuthFlow implements Serializable { + + private static final long serialVersionUID = 1L; + + @JsonProperty("authorizationUrl") + private String authorizationUrl; + + @JsonProperty("refreshUrl") + private String refreshUrl; + + private Map scopes; + } + + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + @JsonInclude(JsonInclude.Include.NON_NULL) + public static class PasswordOAuthFlow implements Serializable { + + private static final long serialVersionUID = 1L; + + @JsonProperty("tokenUrl") + private String tokenUrl; + + @JsonProperty("refreshUrl") + private String refreshUrl; + + private Map scopes; + } +} diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/resources/a2a/agent_card_schema.json b/eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/resources/a2a/agent_card_schema.json new file mode 100644 index 0000000000..db9b4946f2 --- /dev/null +++ b/eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/resources/a2a/agent_card_schema.json @@ -0,0 +1,732 @@ +{ + "$schema": "http://json-schema.org/draft-06/schema#", + "title": "A2A Agent Card", + "type": "object", + "required": [ + "name", + "description", + "supportedInterfaces", + "version", + "capabilities", + "defaultInputModes", + "defaultOutputModes", + "skills" + ], + "patternProperties": { + "^(supported_interfaces)$": { + "type": "array", + "description": "Ordered list of supported interfaces. The first entry is preferred.", + "items": { + "$ref": "#/definitions/AgentInterface" + } + }, + "^(documentation_url)$": { + "type": "string", + "description": "A URL providing additional documentation about the agent." + }, + "^(security_schemes)$": { + "type": "object", + "description": "The security scheme details used for authenticating with this agent.", + "additionalProperties": { + "$ref": "#/definitions/SecurityScheme" + }, + "propertyNames": { + "type": "string" + } + }, + "^(security_requirements)$": { + "type": "array", + "description": "Security requirements for contacting the agent.", + "items": { + "$ref": "#/definitions/SecurityRequirement" + } + }, + "^(default_input_modes)$": { + "type": "array", + "items": { "type": "string" }, + "description": "The set of interaction modes that the agent supports across all skills, defined as media types." + }, + "^(default_output_modes)$": { + "type": "array", + "items": { "type": "string" }, + "description": "The media types supported as outputs from this agent." + }, + "^(icon_url)$": { + "type": "string", + "description": "A URL to an icon for the agent." + } + }, + "properties": { + "name": { + "type": "string", + "description": "A human readable name for the agent." + }, + "description": { + "type": "string", + "description": "A human-readable description of the agent, assisting users and other agents in understanding its purpose." + }, + "supportedInterfaces": { + "type": "array", + "description": "Ordered list of supported interfaces. The first entry is preferred.", + "items": { + "$ref": "#/definitions/AgentInterface" + } + }, + "provider": { + "$ref": "#/definitions/AgentProvider" + }, + "version": { + "type": "string", + "description": "The version of the agent (e.g., \"1.0.0\")." + }, + "documentationUrl": { + "type": "string", + "description": "A URL providing additional documentation about the agent." + }, + "capabilities": { + "$ref": "#/definitions/AgentCapabilities" + }, + "securitySchemes": { + "type": "object", + "description": "The security scheme details used for authenticating with this agent.", + "additionalProperties": { + "$ref": "#/definitions/SecurityScheme" + }, + "propertyNames": { + "type": "string" + } + }, + "securityRequirements": { + "type": "array", + "description": "Security requirements for contacting the agent.", + "items": { + "$ref": "#/definitions/SecurityRequirement" + } + }, + "defaultInputModes": { + "type": "array", + "items": { "type": "string" }, + "description": "The set of interaction modes that the agent supports across all skills, defined as media types." + }, + "defaultOutputModes": { + "type": "array", + "items": { "type": "string" }, + "description": "The media types supported as outputs from this agent." + }, + "skills": { + "type": "array", + "description": "Skills represent the abilities of an agent.", + "items": { + "$ref": "#/definitions/AgentSkill" + } + }, + "signatures": { + "type": "array", + "description": "JSON Web Signatures computed for this AgentCard.", + "items": { + "$ref": "#/definitions/AgentCardSignature" + } + }, + "iconUrl": { + "type": "string", + "description": "A URL to an icon for the agent." + } + }, + "additionalProperties": false, + "definitions": { + "AgentInterface": { + "type": "object", + "description": "Declares a combination of a target URL, transport and protocol version for interacting with the agent.", + "required": ["url", "protocolBinding", "protocolVersion"], + "patternProperties": { + "^(protocol_binding)$": { + "type": "string", + "description": "The protocol binding supported at this URL (e.g., JSONRPC, GRPC, HTTP+JSON)." + }, + "^(protocol_version)$": { + "type": "string", + "description": "The version of the A2A protocol this interface exposes (e.g., \"0.3\", \"1.0\")." + } + }, + "properties": { + "url": { + "type": "string", + "description": "The URL where this interface is available." + }, + "protocolBinding": { + "type": "string", + "description": "The protocol binding supported at this URL (e.g., JSONRPC, GRPC, HTTP+JSON)." + }, + "tenant": { + "type": "string", + "description": "Tenant ID to be used in the request when calling the agent." + }, + "protocolVersion": { + "type": "string", + "description": "The version of the A2A protocol this interface exposes (e.g., \"0.3\", \"1.0\")." + } + }, + "additionalProperties": false + }, + "AgentProvider": { + "type": "object", + "description": "Represents the service provider of an agent.", + "required": ["url", "organization"], + "properties": { + "url": { + "type": "string", + "description": "A URL for the agent provider's website or relevant documentation." + }, + "organization": { + "type": "string", + "description": "The name of the agent provider's organization." + } + }, + "additionalProperties": false + }, + "AgentCapabilities": { + "type": "object", + "description": "Defines optional capabilities supported by an agent.", + "patternProperties": { + "^(push_notifications)$": { + "type": "boolean", + "description": "Indicates if the agent supports sending push notifications for asynchronous task updates." + }, + "^(extended_agent_card)$": { + "type": "boolean", + "description": "Indicates if the agent supports providing an extended agent card when authenticated." + } + }, + "properties": { + "streaming": { + "type": "boolean", + "description": "Indicates if the agent supports streaming responses." + }, + "pushNotifications": { + "type": "boolean", + "description": "Indicates if the agent supports sending push notifications for asynchronous task updates." + }, + "extensions": { + "type": "array", + "description": "A list of protocol extensions supported by the agent.", + "items": { + "$ref": "#/definitions/AgentExtension" + } + }, + "extendedAgentCard": { + "type": "boolean", + "description": "Indicates if the agent supports providing an extended agent card when authenticated." + } + }, + "additionalProperties": false + }, + "AgentExtension": { + "type": "object", + "description": "A declaration of a protocol extension supported by an Agent.", + "properties": { + "uri": { + "type": "string", + "description": "The unique URI identifying the extension." + }, + "description": { + "type": "string", + "description": "A human-readable description of how this agent uses the extension." + }, + "required": { + "type": "boolean", + "description": "If true, the client must understand and comply with the extension's requirements." + }, + "params": { + "type": "object", + "description": "Extension-specific configuration parameters." + } + }, + "additionalProperties": false + }, + "AgentSkill": { + "type": "object", + "description": "Represents a distinct capability or function that an agent can perform.", + "required": ["id", "name", "description", "tags"], + "patternProperties": { + "^(input_modes)$": { + "type": "array", + "items": { "type": "string" }, + "description": "The set of supported input media types for this skill, overriding the agent's defaults." + }, + "^(output_modes)$": { + "type": "array", + "items": { "type": "string" }, + "description": "The set of supported output media types for this skill, overriding the agent's defaults." + }, + "^(security_requirements)$": { + "type": "array", + "description": "Security schemes necessary for this skill.", + "items": { + "$ref": "#/definitions/SecurityRequirement" + } + } + }, + "properties": { + "id": { + "type": "string", + "description": "A unique identifier for the agent's skill." + }, + "name": { + "type": "string", + "description": "A human-readable name for the skill." + }, + "description": { + "type": "string", + "description": "A detailed description of the skill." + }, + "tags": { + "type": "array", + "items": { "type": "string" }, + "description": "A set of keywords describing the skill's capabilities." + }, + "examples": { + "type": "array", + "items": { "type": "string" }, + "description": "Example prompts or scenarios that this skill can handle." + }, + "inputModes": { + "type": "array", + "items": { "type": "string" }, + "description": "The set of supported input media types for this skill, overriding the agent's defaults." + }, + "outputModes": { + "type": "array", + "items": { "type": "string" }, + "description": "The set of supported output media types for this skill, overriding the agent's defaults." + }, + "securityRequirements": { + "type": "array", + "description": "Security schemes necessary for this skill.", + "items": { + "$ref": "#/definitions/SecurityRequirement" + } + } + }, + "additionalProperties": false + }, + "AgentCardSignature": { + "type": "object", + "description": "Represents a JWS signature of an AgentCard (RFC 7515).", + "required": ["protected", "signature"], + "properties": { + "protected": { + "type": "string", + "description": "The protected JWS header for the signature, base64url-encoded JSON object." + }, + "signature": { + "type": "string", + "description": "The computed signature, base64url-encoded." + }, + "header": { + "type": "object", + "description": "The unprotected JWS header values." + } + }, + "additionalProperties": false + }, + "SecurityRequirement": { + "type": "object", + "description": "Defines the security requirements for an agent.", + "properties": { + "schemes": { + "type": "object", + "description": "A map of security scheme names to the required scopes.", + "additionalProperties": { + "$ref": "#/definitions/StringList" + }, + "propertyNames": { + "type": "string" + } + } + }, + "additionalProperties": false + }, + "StringList": { + "type": "object", + "description": "A list of strings.", + "properties": { + "list": { + "type": "array", + "description": "The individual string values.", + "items": { "type": "string" } + } + }, + "additionalProperties": false + }, + "SecurityScheme": { + "type": "object", + "description": "Defines a security scheme that can be used to secure an agent's endpoints. This is a discriminated union type based on the OpenAPI 3.2 Security Scheme Object.", + "patternProperties": { + "^(api_key_security_scheme)$": { + "$ref": "#/definitions/APIKeySecurityScheme", + "description": "API key-based authentication." + }, + "^(http_auth_security_scheme)$": { + "$ref": "#/definitions/HTTPAuthSecurityScheme", + "description": "HTTP authentication (Basic, Bearer, etc.)." + }, + "^(oauth2_security_scheme)$": { + "$ref": "#/definitions/OAuth2SecurityScheme", + "description": "OAuth 2.0 authentication." + }, + "^(open_id_connect_security_scheme)$": { + "$ref": "#/definitions/OpenIdConnectSecurityScheme", + "description": "OpenID Connect authentication." + }, + "^(mtls_security_scheme)$": { + "$ref": "#/definitions/MutualTlsSecurityScheme", + "description": "Mutual TLS authentication." + } + }, + "properties": { + "apiKeySecurityScheme": { + "$ref": "#/definitions/APIKeySecurityScheme", + "description": "API key-based authentication." + }, + "httpAuthSecurityScheme": { + "$ref": "#/definitions/HTTPAuthSecurityScheme", + "description": "HTTP authentication (Basic, Bearer, etc.)." + }, + "oauth2SecurityScheme": { + "$ref": "#/definitions/OAuth2SecurityScheme", + "description": "OAuth 2.0 authentication." + }, + "openIdConnectSecurityScheme": { + "$ref": "#/definitions/OpenIdConnectSecurityScheme", + "description": "OpenID Connect authentication." + }, + "mtlsSecurityScheme": { + "$ref": "#/definitions/MutualTlsSecurityScheme", + "description": "Mutual TLS authentication." + } + }, + "additionalProperties": false + }, + "APIKeySecurityScheme": { + "type": "object", + "description": "Defines a security scheme using an API key.", + "properties": { + "description": { + "type": "string", + "description": "An optional description for the security scheme." + }, + "location": { + "type": "string", + "description": "The location of the API key. Valid values are \"query\", \"header\", or \"cookie\"." + }, + "name": { + "type": "string", + "description": "The name of the header, query, or cookie parameter to be used." + } + }, + "additionalProperties": false + }, + "HTTPAuthSecurityScheme": { + "type": "object", + "description": "Defines a security scheme using HTTP authentication.", + "patternProperties": { + "^(bearer_format)$": { + "type": "string", + "description": "A hint to identify how the bearer token is formatted (e.g., \"JWT\")." + } + }, + "properties": { + "description": { + "type": "string", + "description": "An optional description for the security scheme." + }, + "scheme": { + "type": "string", + "description": "The HTTP Authentication scheme (e.g., Bearer, Basic)." + }, + "bearerFormat": { + "type": "string", + "description": "A hint to identify how the bearer token is formatted (e.g., \"JWT\")." + } + }, + "additionalProperties": false + }, + "OAuth2SecurityScheme": { + "type": "object", + "description": "Defines a security scheme using OAuth 2.0.", + "patternProperties": { + "^(oauth2_metadata_url)$": { + "type": "string", + "description": "URL to the OAuth2 authorization server metadata (RFC 8414)." + } + }, + "properties": { + "description": { + "type": "string", + "description": "An optional description for the security scheme." + }, + "flows": { + "$ref": "#/definitions/OAuthFlows", + "description": "An object containing configuration information for the supported OAuth 2.0 flows." + }, + "oauth2MetadataUrl": { + "type": "string", + "description": "URL to the OAuth2 authorization server metadata (RFC 8414)." + } + }, + "additionalProperties": false + }, + "OpenIdConnectSecurityScheme": { + "type": "object", + "description": "Defines a security scheme using OpenID Connect.", + "patternProperties": { + "^(open_id_connect_url)$": { + "type": "string", + "description": "The OpenID Connect Discovery URL." + } + }, + "properties": { + "description": { + "type": "string", + "description": "An optional description for the security scheme." + }, + "openIdConnectUrl": { + "type": "string", + "description": "The OpenID Connect Discovery URL." + } + }, + "additionalProperties": false + }, + "MutualTlsSecurityScheme": { + "type": "object", + "description": "Defines a security scheme using mTLS authentication.", + "properties": { + "description": { + "type": "string", + "description": "An optional description for the security scheme." + } + }, + "additionalProperties": false + }, + "OAuthFlows": { + "type": "object", + "description": "Defines the configuration for the supported OAuth 2.0 flows.", + "patternProperties": { + "^(authorization_code)$": { + "$ref": "#/definitions/AuthorizationCodeOAuthFlow", + "description": "Configuration for the OAuth Authorization Code flow." + }, + "^(client_credentials)$": { + "$ref": "#/definitions/ClientCredentialsOAuthFlow", + "description": "Configuration for the OAuth Client Credentials flow." + }, + "^(device_code)$": { + "$ref": "#/definitions/DeviceCodeOAuthFlow", + "description": "Configuration for the OAuth Device Code flow." + } + }, + "properties": { + "authorizationCode": { + "$ref": "#/definitions/AuthorizationCodeOAuthFlow", + "description": "Configuration for the OAuth Authorization Code flow." + }, + "clientCredentials": { + "$ref": "#/definitions/ClientCredentialsOAuthFlow", + "description": "Configuration for the OAuth Client Credentials flow." + }, + "deviceCode": { + "$ref": "#/definitions/DeviceCodeOAuthFlow", + "description": "Configuration for the OAuth Device Code flow." + }, + "implicit": { + "$ref": "#/definitions/ImplicitOAuthFlow", + "description": "Deprecated: Use Authorization Code + PKCE instead." + }, + "password": { + "$ref": "#/definitions/PasswordOAuthFlow", + "description": "Deprecated: Use Authorization Code + PKCE or Device Code." + } + }, + "additionalProperties": false + }, + "AuthorizationCodeOAuthFlow": { + "type": "object", + "description": "Defines configuration details for the OAuth 2.0 Authorization Code flow.", + "patternProperties": { + "^(authorization_url)$": { + "type": "string", + "description": "The authorization URL to be used for this flow." + }, + "^(token_url)$": { + "type": "string", + "description": "The token URL to be used for this flow." + }, + "^(refresh_url)$": { + "type": "string", + "description": "The URL to be used for obtaining refresh tokens." + }, + "^(pkce_required)$": { + "type": "boolean", + "description": "Indicates if PKCE (RFC 7636) is required for this flow." + } + }, + "properties": { + "authorizationUrl": { + "type": "string", + "description": "The authorization URL to be used for this flow." + }, + "tokenUrl": { + "type": "string", + "description": "The token URL to be used for this flow." + }, + "refreshUrl": { + "type": "string", + "description": "The URL to be used for obtaining refresh tokens." + }, + "scopes": { + "type": "object", + "description": "The available scopes for the OAuth2 security scheme.", + "additionalProperties": { "type": "string" }, + "propertyNames": { "type": "string" } + }, + "pkceRequired": { + "type": "boolean", + "description": "Indicates if PKCE (RFC 7636) is required for this flow." + } + }, + "additionalProperties": false + }, + "ClientCredentialsOAuthFlow": { + "type": "object", + "description": "Defines configuration details for the OAuth 2.0 Client Credentials flow.", + "patternProperties": { + "^(token_url)$": { + "type": "string", + "description": "The token URL to be used for this flow." + }, + "^(refresh_url)$": { + "type": "string", + "description": "The URL to be used for obtaining refresh tokens." + } + }, + "properties": { + "tokenUrl": { + "type": "string", + "description": "The token URL to be used for this flow." + }, + "refreshUrl": { + "type": "string", + "description": "The URL to be used for obtaining refresh tokens." + }, + "scopes": { + "type": "object", + "description": "The available scopes for the OAuth2 security scheme.", + "additionalProperties": { "type": "string" }, + "propertyNames": { "type": "string" } + } + }, + "additionalProperties": false + }, + "DeviceCodeOAuthFlow": { + "type": "object", + "description": "Defines configuration details for the OAuth 2.0 Device Code flow (RFC 8628).", + "patternProperties": { + "^(device_authorization_url)$": { + "type": "string", + "description": "The device authorization endpoint URL." + }, + "^(token_url)$": { + "type": "string", + "description": "The token URL to be used for this flow." + }, + "^(refresh_url)$": { + "type": "string", + "description": "The URL to be used for obtaining refresh tokens." + } + }, + "properties": { + "deviceAuthorizationUrl": { + "type": "string", + "description": "The device authorization endpoint URL." + }, + "tokenUrl": { + "type": "string", + "description": "The token URL to be used for this flow." + }, + "refreshUrl": { + "type": "string", + "description": "The URL to be used for obtaining refresh tokens." + }, + "scopes": { + "type": "object", + "description": "The available scopes for the OAuth2 security scheme.", + "additionalProperties": { "type": "string" }, + "propertyNames": { "type": "string" } + } + }, + "additionalProperties": false + }, + "ImplicitOAuthFlow": { + "type": "object", + "description": "Deprecated: Use Authorization Code + PKCE instead.", + "patternProperties": { + "^(authorization_url)$": { + "type": "string", + "description": "The authorization URL to be used for this flow." + }, + "^(refresh_url)$": { + "type": "string", + "description": "The URL to be used for obtaining refresh tokens." + } + }, + "properties": { + "authorizationUrl": { + "type": "string", + "description": "The authorization URL to be used for this flow." + }, + "refreshUrl": { + "type": "string", + "description": "The URL to be used for obtaining refresh tokens." + }, + "scopes": { + "type": "object", + "description": "The available scopes for the OAuth2 security scheme.", + "additionalProperties": { "type": "string" }, + "propertyNames": { "type": "string" } + } + }, + "additionalProperties": false + }, + "PasswordOAuthFlow": { + "type": "object", + "description": "Deprecated: Use Authorization Code + PKCE or Device Code.", + "patternProperties": { + "^(token_url)$": { + "type": "string", + "description": "The token URL to be used for this flow." + }, + "^(refresh_url)$": { + "type": "string", + "description": "The URL to be used for obtaining refresh tokens." + } + }, + "properties": { + "tokenUrl": { + "type": "string", + "description": "The token URL to be used for this flow." + }, + "refreshUrl": { + "type": "string", + "description": "The URL to be used for obtaining refresh tokens." + }, + "scopes": { + "type": "object", + "description": "The available scopes for the OAuth2 security scheme.", + "additionalProperties": { "type": "string" }, + "propertyNames": { "type": "string" } + } + }, + "additionalProperties": false + } + } +} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/a2a/A2ACardHttpHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/a2a/A2ACardHttpHandler.java new file mode 100644 index 0000000000..d5ca610b79 --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/a2a/A2ACardHttpHandler.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.a2a; + +import org.apache.eventmesh.protocol.a2a.AgentIdentity; +import org.apache.eventmesh.protocol.a2a.model.AgentCard; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http.QueryStringDecoder; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import lombok.extern.slf4j.Slf4j; + +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * HTTP handler for A2A Agent Card Registry API. + * Matches EMQX's API pattern: + * - GET /a2a/cards/list - list agent cards + * - GET /a2a/cards/card/{org_id}/{unit_id}/{agent_id} - get specific card + * - POST /a2a/cards/card/{org_id}/{unit_id}/{agent_id} - register card + * - DELETE /a2a/cards/card/{org_id}/{unit_id}/{agent_id} - delete card + */ +@Slf4j +public class A2ACardHttpHandler { + + private static final ObjectMapper objectMapper = new ObjectMapper(); + private static final String PATH_LIST = "/a2a/cards/list"; + private static final String PATH_CARD_PREFIX = "/a2a/cards/card/"; + + private final A2APublishSubscribeService a2aService; + + public A2ACardHttpHandler(A2APublishSubscribeService a2aService) { + this.a2aService = a2aService; + } + + public FullHttpResponse handle(HttpRequest httpRequest, ChannelHandlerContext ctx) throws Exception { + String uri = httpRequest.uri(); + QueryStringDecoder decoder = new QueryStringDecoder(uri); + String path = decoder.path(); + + try { + if (path.equals(PATH_LIST)) { + return handleList(decoder); + } + + if (path.startsWith(PATH_CARD_PREFIX)) { + String[] segments = path.substring(PATH_CARD_PREFIX.length()).split("/"); + if (segments.length != 3) { + return jsonResponse(HttpResponseStatus.BAD_REQUEST, + errorBody("Invalid path. Expected /a2a/cards/card/{org_id}/{unit_id}/{agent_id}")); + } + + String orgId = segments[0]; + String unitId = segments[1]; + String agentId = segments[2]; + + if (httpRequest.method() == HttpMethod.GET) { + return handleGet(orgId, unitId, agentId); + } else if (httpRequest.method() == HttpMethod.POST) { + return handleRegister(orgId, unitId, agentId, httpRequest); + } else if (httpRequest.method() == HttpMethod.DELETE) { + return handleDelete(orgId, unitId, agentId); + } + } + + return jsonResponse(HttpResponseStatus.NOT_FOUND, errorBody("Not found: " + path)); + } catch (Exception e) { + log.error("A2A card handler error: {}", e.getMessage(), e); + return jsonResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, errorBody(e.getMessage())); + } + } + + private FullHttpResponse handleList(QueryStringDecoder decoder) throws Exception { + String orgId = getQueryParam(decoder, "org_id"); + String unitId = getQueryParam(decoder, "unit_id"); + String agentId = getQueryParam(decoder, "agent_id"); + + List cards = a2aService.listCards(orgId, unitId, agentId); + String body = objectMapper.writeValueAsString(cards); + return jsonResponse(HttpResponseStatus.OK, body); + } + + private FullHttpResponse handleGet(String orgId, String unitId, String agentId) throws Exception { + AgentIdentity identity = AgentIdentity.builder() + .orgId(orgId).unitId(unitId).agentId(agentId).build(); + A2APublishSubscribeService.CardEntry card = a2aService.getCard(identity); + + if (card == null) { + return jsonResponse(HttpResponseStatus.NOT_FOUND, errorBody("Card not found")); + } + + String body = objectMapper.writeValueAsString(formatCardOut(card)); + return jsonResponse(HttpResponseStatus.OK, body); + } + + private FullHttpResponse handleRegister(String orgId, String unitId, String agentId, HttpRequest request) throws Exception { + // Read body from request + String requestBody = ""; + if (request instanceof io.netty.handler.codec.http.FullHttpRequest) { + ByteBuf content = ((io.netty.handler.codec.http.FullHttpRequest) request).content(); + requestBody = content.toString(StandardCharsets.UTF_8); + } + + AgentCard card; + try { + card = objectMapper.readValue(requestBody, AgentCard.class); + } catch (Exception e) { + return jsonResponse(HttpResponseStatus.BAD_REQUEST, errorBody("Invalid card JSON: " + e.getMessage())); + } + + AgentIdentity identity = AgentIdentity.builder() + .orgId(orgId).unitId(unitId).agentId(agentId).build(); + A2APublishSubscribeService.RegistrationResult result = a2aService.registerCard(identity, card); + + if (result.isSuccess()) { + return jsonResponse(HttpResponseStatus.NO_CONTENT, ""); + } else { + return jsonResponse(HttpResponseStatus.BAD_REQUEST, errorBody(result.getErrorMessage())); + } + } + + private FullHttpResponse handleDelete(String orgId, String unitId, String agentId) throws Exception { + AgentIdentity identity = AgentIdentity.builder() + .orgId(orgId).unitId(unitId).agentId(agentId).build(); + boolean deleted = a2aService.deleteCard(identity); + + if (deleted) { + return jsonResponse(HttpResponseStatus.NO_CONTENT, ""); + } else { + return jsonResponse(HttpResponseStatus.NOT_FOUND, errorBody("Card not found")); + } + } + + private Map formatCardOut(A2APublishSubscribeService.CardEntry entry) { + Map out = new LinkedHashMap<>(); + out.put("namespace", entry.getNamespace()); + out.put("id", entry.getId()); + out.put("name", entry.getName()); + out.put("version", entry.getVersion()); + out.put("description", entry.getDescription()); + out.put("status", entry.getStatus()); + return out; + } + + private String getQueryParam(QueryStringDecoder decoder, String key) { + List values = decoder.parameters().get(key); + return (values != null && !values.isEmpty()) ? values.get(0) : null; + } + + private String errorBody(String message) { + try { + return objectMapper.writeValueAsString(Collections.singletonMap("error", message)); + } catch (Exception e) { + return "{\"error\":\"" + message + "\"}"; + } + } + + private FullHttpResponse jsonResponse(HttpResponseStatus status, String body) { + ByteBuf content = Unpooled.copiedBuffer(body, StandardCharsets.UTF_8); + DefaultFullHttpResponse response = new DefaultFullHttpResponse( + HttpVersion.HTTP_1_1, status, content); + response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json; charset=utf-8"); + response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes()); + return response; + } +} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/a2a/A2APublishSubscribeService.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/a2a/A2APublishSubscribeService.java new file mode 100644 index 0000000000..c8ada5de08 --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/a2a/A2APublishSubscribeService.java @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.a2a; + +import org.apache.eventmesh.protocol.a2a.A2AProtocolConstants; +import org.apache.eventmesh.protocol.a2a.AgentCardValidator; +import org.apache.eventmesh.protocol.a2a.AgentIdentity; +import org.apache.eventmesh.protocol.a2a.model.AgentCard; +import org.apache.eventmesh.runtime.boot.EventMeshServer; + +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import lombok.extern.slf4j.Slf4j; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * A2APublishSubscribeService: Manages A2A Agent Card Registry and processes A2A events. + * + *

Features: + * - Agent Card CRUD (register, delete, get, list) + * - Agent status tracking (online/offline) + * - Event processing with status metadata augmentation + * - Hierarchical identity (org_id/unit_id/agent_id) with wildcard queries + */ +@Slf4j +public class A2APublishSubscribeService { + + private static final ObjectMapper objectMapper = new ObjectMapper(); + + private final EventMeshServer eventMeshServer; + private volatile boolean isStarted = false; + + private final ConcurrentHashMap cardRegistry = new ConcurrentHashMap<>(); + private final ConcurrentHashMap agentStatusMap = new ConcurrentHashMap<>(); + + private AgentCardValidator cardValidator; + + public A2APublishSubscribeService(EventMeshServer eventMeshServer) { + this.eventMeshServer = eventMeshServer; + } + + public void init() throws Exception { + this.cardValidator = new AgentCardValidator(true); + log.info("A2APublishSubscribeService initialized with Agent Card Registry."); + } + + public void start() throws Exception { + isStarted = true; + log.info("A2APublishSubscribeService started."); + } + + public void shutdown() throws Exception { + isStarted = false; + cardRegistry.clear(); + agentStatusMap.clear(); + log.info("A2APublishSubscribeService shutdown."); + } + + // ========================================================================= + // Agent Card Registry Operations + // ========================================================================= + + /** + * Registers an Agent Card. Validates the card and identity before storing. + * + * @param identity the agent identity (org_id/unit_id/agent_id) + * @param card the agent card + * @return RegistrationResult with success/failure + */ + public RegistrationResult registerCard(AgentIdentity identity, AgentCard card) { + if (!isStarted) { + throw new IllegalStateException("A2APublishSubscribeService is not started"); + } + + // Validate identity + if (!identity.isValid()) { + String msg = String.format("Invalid agent identity: orgId=%s, unitId=%s, agentId=%s", + identity.getOrgId(), identity.getUnitId(), identity.getAgentId()); + log.warn(msg); + return RegistrationResult.failure(msg); + } + + // Validate card + try { + String cardJson = objectMapper.writeValueAsString(card); + AgentCardValidator.ValidationResult result = cardValidator.validate(cardJson); + if (!result.isValid()) { + log.warn("Agent card schema validation failed for {}: {}", identity.clientId(), result.getErrorMessage()); + return RegistrationResult.failure(result.getErrorMessage()); + } + } catch (Exception e) { + log.warn("Failed to serialize/validate agent card for {}: {}", identity.clientId(), e.getMessage()); + return RegistrationResult.failure("Card validation error: " + e.getMessage()); + } + + RegisteredCard existing = cardRegistry.put(identity, new RegisteredCard(card, System.currentTimeMillis())); + if (existing != null) { + log.info("Updated agent card for {}", identity.clientId()); + } else { + log.info("Registered new agent card for {}", identity.clientId()); + } + + // Set agent online + agentStatusMap.put(identity.clientId(), AgentStatus.ONLINE); + + return RegistrationResult.success(); + } + + /** + * Deletes an Agent Card from the registry. + */ + public boolean deleteCard(AgentIdentity identity) { + if (!isStarted) { + throw new IllegalStateException("A2APublishSubscribeService is not started"); + } + RegisteredCard removed = cardRegistry.remove(identity); + if (removed != null) { + agentStatusMap.remove(identity.clientId()); + log.info("Deleted agent card for {}", identity.clientId()); + return true; + } + return false; + } + + /** + * Gets a specific Agent Card. + */ + public CardEntry getCard(AgentIdentity identity) { + if (!isStarted) { + throw new IllegalStateException("A2APublishSubscribeService is not started"); + } + RegisteredCard rc = cardRegistry.get(identity); + if (rc == null) { + return null; + } + return new CardEntry(identity, rc.card, lookupAgentStatus(identity)); + } + + /** + * Lists Agent Cards matching the given filters. Use null or "+" for wildcard matching. + */ + public List listCards(String orgId, String unitId, String agentId) { + if (!isStarted) { + throw new IllegalStateException("A2APublishSubscribeService is not started"); + } + List results = new ArrayList<>(); + for (Map.Entry entry : cardRegistry.entrySet()) { + AgentIdentity id = entry.getKey(); + if (id.matchesFilter(orgId, unitId, agentId)) { + results.add(new CardEntry(id, entry.getValue().card, lookupAgentStatus(id))); + } + } + return results; + } + + /** + * Lists all registered Agent Cards. + */ + public List listAllCards() { + return listCards(null, null, null); + } + + // ========================================================================= + // Agent Status + // ========================================================================= + + /** + * Looks up the status of an agent (online/offline). + */ + public String lookupAgentStatus(AgentIdentity identity) { + AgentStatus status = agentStatusMap.get(identity.clientId()); + return status != null ? status.value : A2AProtocolConstants.STATUS_OFFLINE; + } + + /** + * Sets the status of an agent. + */ + public void setAgentStatus(AgentIdentity identity, String status) { + if (A2AProtocolConstants.STATUS_ONLINE.equals(status)) { + agentStatusMap.put(identity.clientId(), AgentStatus.ONLINE); + } else { + agentStatusMap.put(identity.clientId(), AgentStatus.OFFLINE); + } + } + + // ========================================================================= + // Event Processing + // ========================================================================= + + /** + * Processes an A2A CloudEvent. Augments events with agent status metadata. + * + * @param event The CloudEvent to process. + * @return The processed (potentially modified) CloudEvent. + */ + public CloudEvent process(CloudEvent event) { + if (!isStarted) { + throw new IllegalStateException("A2APublishSubscribeService is not started"); + } + + log.debug("Processing A2A event: {}", event.getId()); + + // Check if this is an A2A discovery topic event + String subject = event.getSubject(); + if (subject != null && subject.startsWith(A2AProtocolConstants.TOPIC_NAMESPACE + "/")) { + AgentIdentity identity = AgentIdentity.fromDiscoveryTopic(subject); + if (identity != null) { + return augmentWithStatusMetadata(event, identity); + } + } + + return event; + } + + /** + * Augments a CloudEvent with agent status metadata (a2astatus, a2astatussource extensions). + * Matches EMQX's on_message_delivered hook behavior. + */ + private CloudEvent augmentWithStatusMetadata(CloudEvent event, AgentIdentity identity) { + String status = lookupAgentStatus(identity); + CloudEventBuilder builder = CloudEventBuilder.from(event); + builder.withExtension(A2AProtocolConstants.CE_EXTENSION_A2A_STATUS, status); + builder.withExtension(A2AProtocolConstants.CE_EXTENSION_A2A_STATUS_SOURCE, "eventmesh"); + return builder.build(); + } + + // ========================================================================= + // Inner Types + // ========================================================================= + + private enum AgentStatus { + ONLINE(A2AProtocolConstants.STATUS_ONLINE), + OFFLINE(A2AProtocolConstants.STATUS_OFFLINE); + + final String value; + + AgentStatus(String value) { + this.value = value; + } + } + + private static class RegisteredCard { + + final AgentCard card; + final long registeredAt; + + RegisteredCard(AgentCard card, long registeredAt) { + this.card = card; + this.registeredAt = registeredAt; + } + } + + /** + * Represents a registered agent card entry with identity, card data, and status. + */ + public static class CardEntry { + + private final AgentIdentity identity; + private final AgentCard card; + private final String status; + + public CardEntry(AgentIdentity identity, AgentCard card, String status) { + this.identity = identity; + this.card = card; + this.status = status; + } + + public AgentIdentity getIdentity() { + return identity; + } + + public AgentCard getCard() { + return card; + } + + public String getStatus() { + return status; + } + + public String getNamespace() { + return identity.getNamespace(); + } + + public String getId() { + return identity.clientId(); + } + + public String getName() { + return card.getName(); + } + + public String getVersion() { + return card.getVersion(); + } + + public String getDescription() { + return card.getDescription(); + } + } + + /** + * Result of a card registration attempt. + */ + public static class RegistrationResult { + + private final boolean success; + private final String errorMessage; + + private RegistrationResult(boolean success, String errorMessage) { + this.success = success; + this.errorMessage = errorMessage; + } + + public static RegistrationResult success() { + return new RegistrationResult(true, null); + } + + public static RegistrationResult failure(String errorMessage) { + return new RegistrationResult(false, errorMessage); + } + + public boolean isSuccess() { + return success; + } + + public String getErrorMessage() { + return errorMessage; + } + } +} diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/boot/FilterEngineTest.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/boot/FilterEngineTest.java new file mode 100644 index 0000000000..4d79ec21da --- /dev/null +++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/boot/FilterEngineTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.boot; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.when; + +import org.apache.eventmesh.function.filter.pattern.Pattern; +import org.apache.eventmesh.runtime.meta.MetaStorage; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class FilterEngineTest { + + @Mock + private MetaStorage metaStorage; + + @Test + public void testStartAndGetFilter() { + FilterEngine filterEngine = new FilterEngine(metaStorage); + + // Mock MetaData + Map filterMetaData = new HashMap<>(); + String group = "testGroup"; + // JSON config for filter + // Condition: source == "testSource" (must be array) + String filterJson = "[{\"topic\":\"testTopic\", \"condition\":{\"source\":[\"testSource\"]}}]"; + filterMetaData.put("filter-" + group, filterJson); + + when(metaStorage.getMetaData(any(String.class), anyBoolean())).thenReturn(filterMetaData); + + // Start Engine + filterEngine.start(); + + // Get Filter + Pattern pattern = filterEngine.getFilterPattern(group + "-testTopic"); + Assertions.assertNotNull(pattern); + + // Verify Filter behavior (optional, depends on Pattern implementation) + // String validEventJson = "{"specversion":"1.0","id":"1","source":"testSource","type":"testType"}"; + // Assertions.assertTrue(pattern.filter(validEventJson)); + } +} diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/boot/RouterEngineTest.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/boot/RouterEngineTest.java new file mode 100644 index 0000000000..294c3a07bc --- /dev/null +++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/boot/RouterEngineTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.boot; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.eventmesh.function.api.Router; +import org.apache.eventmesh.runtime.meta.MetaStorage; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class RouterEngineTest { + + @Mock + private MetaStorage metaStorage; + + @Test + public void testStartAndRoute() { + RouterEngine routerEngine = new RouterEngine(metaStorage); + + // Mock MetaData + Map routerMetaData = new HashMap<>(); + String group = "testGroup"; + // JSON config for router + String routerJson = "[{\"topic\":\"sourceTopic\", \"routerConfig\":\"targetTopic\"}]"; + routerMetaData.put("router-" + group, routerJson); + + when(metaStorage.getMetaData(any(String.class), anyBoolean())).thenReturn(routerMetaData); + + // Start Engine + routerEngine.start(); + + // Get Router + Router router = routerEngine.getRouter(group + "-sourceTopic"); + Assertions.assertNotNull(router); + + // Test Route + String target = router.route("{}"); + // Since RouterBuilder uses DefaultRouter which returns the config string directly, + // passing "targetTopic" as config should return "targetTopic". + // However, RouterEngine gets "routerConfig" node from JSON. + // If "routerConfig" is "targetTopic" string, Jackson toString() might quote it like "\"targetTopic\"". + // Let's check RouterEngine logic: routerJsonNode.get("routerConfig").toString() + // If json is {"routerConfig": "targetTopic"}, .get("routerConfig") is a TextNode. + // .toString() on TextNode returns "\"targetTopic\"". + // .asText() returns "targetTopic". + // The code uses .toString(). + + // Wait, RouterBuilder.build(String) + // If it receives "\"targetTopic\"", it returns it. + + // Let's verify behavior. + // Assertions.assertEquals("\"targetTopic\"", target); + // Or maybe I should fix RouterEngine to use .asText() if it expects a simple string? + // But routerConfig can be a complex JSON object for other Routers. + // So .toString() is safer for generic config. + + // For this test, assuming "targetTopic" -> "\"targetTopic\"" + + Assertions.assertEquals("\"targetTopic\"", target); + } +} diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/boot/TransformerEngineTest.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/boot/TransformerEngineTest.java new file mode 100644 index 0000000000..4ec24f542d --- /dev/null +++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/boot/TransformerEngineTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.boot; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.when; + +import org.apache.eventmesh.function.transformer.Transformer; +import org.apache.eventmesh.runtime.meta.MetaStorage; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class TransformerEngineTest { + + @Mock + private MetaStorage metaStorage; + + @Test + public void testStartAndGetTransformer() throws Exception { + TransformerEngine transformerEngine = new TransformerEngine(metaStorage); + + // Mock MetaData + Map transformerMetaData = new HashMap<>(); + String group = "testGroup"; + + // JSON config for transformer + // Use "original" which passes through + String transformerJson = "[{\"topic\":\"testTopic\", \"transformerParam\":{\"transformerType\":\"original\"}}]"; + transformerMetaData.put("transformer-" + group, transformerJson); + + when(metaStorage.getMetaData(any(String.class), anyBoolean())).thenReturn(transformerMetaData); + + // Start Engine + transformerEngine.start(); + + // Get Transformer + Transformer transformer = transformerEngine.getTransformer(group + "-testTopic"); + Assertions.assertNotNull(transformer); + + // Verify transform (original returns content as is) + String content = "testContent"; + Assertions.assertEquals(content, transformer.transform(content)); + } +} diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessorTest.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessorTest.java new file mode 100644 index 0000000000..a700137876 --- /dev/null +++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessorTest.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.core.protocol.http.processor; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.eventmesh.api.SendCallback; +import org.apache.eventmesh.common.protocol.ProtocolTransportObject; +import org.apache.eventmesh.common.protocol.http.HttpEventWrapper; +import org.apache.eventmesh.common.protocol.http.common.ProtocolKey; +import org.apache.eventmesh.function.api.Router; +import org.apache.eventmesh.protocol.api.ProtocolAdaptor; +import org.apache.eventmesh.protocol.api.ProtocolPluginFactory; +import org.apache.eventmesh.runtime.acl.Acl; +import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer; +import org.apache.eventmesh.runtime.boot.EventMeshServer; +import org.apache.eventmesh.runtime.boot.FilterEngine; +import org.apache.eventmesh.runtime.boot.HTTPTrace.TraceOperation; +import org.apache.eventmesh.runtime.boot.RouterEngine; +import org.apache.eventmesh.runtime.boot.TransformerEngine; +import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration; +import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext; +import org.apache.eventmesh.runtime.core.protocol.http.retry.HttpRetryer; +import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer; +import org.apache.eventmesh.runtime.core.protocol.producer.ProducerManager; +import org.apache.eventmesh.runtime.core.protocol.producer.SendMessageContext; +import org.apache.eventmesh.runtime.metrics.http.EventMeshHttpMetricsManager; +import org.apache.eventmesh.runtime.metrics.http.HttpMetrics; +import org.apache.eventmesh.runtime.util.RemotingHelper; + +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; + +import com.google.common.util.concurrent.RateLimiter; + +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.HttpRequest; + +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) +public class SendAsyncEventProcessorTest { + + @Mock + private EventMeshHTTPServer eventMeshHTTPServer; + @Mock + private EventMeshServer eventMeshServer; + @Mock + private EventMeshHTTPConfiguration eventMeshHttpConfiguration; + @Mock + private ProducerManager producerManager; + @Mock + private EventMeshProducer eventMeshProducer; + @Mock + private Acl acl; + @Mock + private FilterEngine filterEngine; + @Mock + private TransformerEngine transformerEngine; + @Mock + private RouterEngine routerEngine; + @Mock + private HandlerService.HandlerSpecific handlerSpecific; + @Mock + private ChannelHandlerContext ctx; + @Mock + private Channel channel; + @Mock + private HttpRequest httpRequest; + @Mock + private HttpRetryer httpRetryer; + @Mock + private EventMeshHttpMetricsManager metricsManager; + @Mock + private HttpMetrics httpMetrics; + @Mock + private ProtocolAdaptor protocolAdaptor; + @Mock + private TraceOperation traceOperation; + + private SendAsyncEventProcessor processor; + + @BeforeEach + public void setUp() { + when(eventMeshHTTPServer.getEventMeshServer()).thenReturn(eventMeshServer); + when(eventMeshHTTPServer.getEventMeshHttpConfiguration()).thenReturn(eventMeshHttpConfiguration); + when(eventMeshHttpConfiguration.getEventMeshEventSize()).thenReturn(1024 * 1024); + + when(eventMeshHTTPServer.getProducerManager()).thenReturn(producerManager); + when(eventMeshHTTPServer.getAcl()).thenReturn(acl); + when(eventMeshHTTPServer.getMsgRateLimiter()).thenReturn(RateLimiter.create(1000)); + when(eventMeshHTTPServer.getHttpRetryer()).thenReturn(httpRetryer); + when(eventMeshHTTPServer.getEventMeshHttpMetricsManager()).thenReturn(metricsManager); + when(metricsManager.getHttpMetrics()).thenReturn(httpMetrics); + + when(eventMeshServer.getFilterEngine()).thenReturn(filterEngine); + when(eventMeshServer.getTransformerEngine()).thenReturn(transformerEngine); + when(eventMeshServer.getRouterEngine()).thenReturn(routerEngine); + + processor = new SendAsyncEventProcessor(eventMeshHTTPServer); + } + + @Test + public void testHandler_V1_NormalFlow() throws Exception { + // Mock Context + AsyncContext asyncContext = mock(AsyncContext.class); + HttpEventWrapper wrapper = mock(HttpEventWrapper.class); + when(handlerSpecific.getAsyncContext()).thenReturn(asyncContext); + when(asyncContext.getRequest()).thenReturn(wrapper); + when(handlerSpecific.getCtx()).thenReturn(ctx); + when(ctx.channel()).thenReturn(channel); + + when(handlerSpecific.getTraceOperation()).thenReturn(traceOperation); + + // Mock Wrapper headers + Map headerMap = new HashMap<>(); + headerMap.put(ProtocolKey.PROTOCOL_TYPE, "http"); + when(wrapper.getHeaderMap()).thenReturn(headerMap); + when(wrapper.getSysHeaderMap()).thenReturn(new HashMap<>()); + when(wrapper.getRequestURI()).thenReturn("http://localhost/publish"); + + // Mock Protocol Adaptor + CloudEvent event = CloudEventBuilder.v1() + .withId("id1").withSource(java.net.URI.create("testSource")).withType("testType") + .withSubject("testTopic") + .withExtension(ProtocolKey.ClientInstanceKey.IDC.getKey(), "idc") + .withExtension(ProtocolKey.ClientInstanceKey.PID.getKey(), "123") + .withExtension(ProtocolKey.ClientInstanceKey.SYS.getKey(), "sys") + .withExtension(ProtocolKey.ClientInstanceKey.PRODUCERGROUP.getKey(), "testGroup") + .withExtension(ProtocolKey.ClientInstanceKey.TOKEN.getKey(), "token") + .withData("testData".getBytes(StandardCharsets.UTF_8)) + .build(); + + try (MockedStatic pluginFactoryMock = Mockito.mockStatic(ProtocolPluginFactory.class); + MockedStatic remotingHelperMock = Mockito.mockStatic(RemotingHelper.class)) { + + pluginFactoryMock.when(() -> ProtocolPluginFactory.getProtocolAdaptor("http")).thenReturn(protocolAdaptor); + when(protocolAdaptor.toCloudEvent(wrapper)).thenReturn(event); + + remotingHelperMock.when(() -> RemotingHelper.parseChannelRemoteAddr(channel)).thenReturn("127.0.0.1"); + + // Mock Producer + when(producerManager.getEventMeshProducer("testGroup", "token")).thenReturn(eventMeshProducer); + when(eventMeshProducer.isStarted()).thenReturn(true); + + // Execute + processor.handler(handlerSpecific, httpRequest); + + // Verify + // 1. Filter/Transformer/Router should be queried + verify(filterEngine).getFilterPattern("testGroup-testTopic"); + verify(transformerEngine).getTransformer("testGroup-testTopic"); + verify(routerEngine).getRouter("testGroup-testTopic"); + + // Verify NO error response + verify(handlerSpecific, times(0)).sendErrorResponse(any(), any(), any(), any()); + + // 2. Send should be called (V1 flow) + verify(eventMeshProducer).send(any(SendMessageContext.class), any(SendCallback.class)); + } + } + + @Test + public void testHandler_V2_RouterFlow() throws Exception { + // Similar setup, but Router returns a new topic + AsyncContext asyncContext = mock(AsyncContext.class); + HttpEventWrapper wrapper = mock(HttpEventWrapper.class); + when(handlerSpecific.getAsyncContext()).thenReturn(asyncContext); + when(asyncContext.getRequest()).thenReturn(wrapper); + when(handlerSpecific.getCtx()).thenReturn(ctx); + when(ctx.channel()).thenReturn(channel); + when(handlerSpecific.getTraceOperation()).thenReturn(traceOperation); + + Map headerMap = new HashMap<>(); + headerMap.put(ProtocolKey.PROTOCOL_TYPE, "http"); + when(wrapper.getHeaderMap()).thenReturn(headerMap); + when(wrapper.getSysHeaderMap()).thenReturn(new HashMap<>()); + when(wrapper.getRequestURI()).thenReturn("http://localhost/publish"); + + CloudEvent event = CloudEventBuilder.v1() + .withId("id1").withSource(java.net.URI.create("testSource")).withType("testType") + .withSubject("oldTopic") // Original Topic + .withExtension(ProtocolKey.ClientInstanceKey.IDC.getKey(), "idc") + .withExtension(ProtocolKey.ClientInstanceKey.PID.getKey(), "123") + .withExtension(ProtocolKey.ClientInstanceKey.SYS.getKey(), "sys") + .withExtension(ProtocolKey.ClientInstanceKey.PRODUCERGROUP.getKey(), "testGroup") + .withExtension(ProtocolKey.ClientInstanceKey.TOKEN.getKey(), "token") + .withData("testData".getBytes(StandardCharsets.UTF_8)) + .build(); + + try (MockedStatic pluginFactoryMock = Mockito.mockStatic(ProtocolPluginFactory.class); + MockedStatic remotingHelperMock = Mockito.mockStatic(RemotingHelper.class)) { + + pluginFactoryMock.when(() -> ProtocolPluginFactory.getProtocolAdaptor("http")).thenReturn(protocolAdaptor); + when(protocolAdaptor.toCloudEvent(wrapper)).thenReturn(event); + remotingHelperMock.when(() -> RemotingHelper.parseChannelRemoteAddr(channel)).thenReturn("127.0.0.1"); + + when(producerManager.getEventMeshProducer("testGroup", "token")).thenReturn(eventMeshProducer); + when(eventMeshProducer.isStarted()).thenReturn(true); + + // Mock Router + Router router = mock(Router.class); + when(routerEngine.getRouter("testGroup-oldTopic")).thenReturn(router); + when(router.route(anyString())).thenReturn("newTopic"); + + // Execute + processor.handler(handlerSpecific, httpRequest); + + // Verify + verify(handlerSpecific, times(0)).sendErrorResponse(any(), any(), any(), any()); + + // Verify send called + verify(eventMeshProducer).send(any(SendMessageContext.class), any(SendCallback.class)); + // Verify router was called + verify(router).route(anyString()); + } + } +} \ No newline at end of file