Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 135 additions & 0 deletions docs/plugins/core-engines-configuration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# EventMesh Core Engines Configuration Guide

EventMesh provides powerful core engines (`Filter`, `Transformer`, `Router`) to dynamically process messages. These engines are configured via **MetaStorage** (Governance Center, e.g., Nacos, Etcd), supporting on-demand loading and hot-reloading.

## 0. Core Concepts

Before configuration, it is important to understand the specific role of each engine in the message flow:

* **Filter (The Gatekeeper)**: Decides **"Whether to pass"**.
* It inspects the message (CloudEvent) attributes. If the message matches the rules, it passes; otherwise, it is dropped.
* *Use Case*: Block debug logs from production traffic; Only subscribe to specific event types.

* **Transformer (The Translator)**: Decides **"What it looks like"**.
* It modifies the message content (Payload or Metadata) according to templates or scripts.
* *Use Case*: Convert XML to JSON; Mask sensitive data (PII); Adapt legacy protocols to new standards.

* **Router (The Dispatcher)**: Decides **"Where to go"**.
* It dynamically changes the destination (Topic) of the message.
* *Use Case*: Route traffic to a Canary/Gray release topic; Route high-priority orders to a dedicated queue.

---

## 1. Overview

The configuration is not in local property files but distributed via the MetaStorage. EventMesh listens to specific **Keys** based on client Groups.

- **Data Source**: Configured via `eventMesh.metaStorage.plugin.type`.
- **Loading Mechanism**: Lazy loading & Hot-reloading.
- **Key Format**: `{EnginePrefix}-{GroupName}`.
- **Value Format**: JSON Array.

| Engine | Prefix | Scope | Description |
| :--- | :--- | :--- | :--- |
| **Router** | `router-` | Pub Only | Routes messages to different topics. |
| **Filter** | `filter-` | Pub & Sub | Filters messages based on CloudEvent attributes. |
| **Transformer** | `transformer-` | Pub & Sub | Transforms message content (Payload/Header). |

---

## 2. Router (Routing)

**Scope**: Publish Only (Upstream)
**Key**: `router-{producerGroup}`

Decides the target storage topic for a message sent by a producer.

### Configuration Example (JSON)

```json
[
{
"topic": "original-topic",
"routerConfig": {
"targetTopic": "redirect-topic",
"expression": "data.type == 'urgent'"
}
}
]
```

* **topic**: The original topic the producer sends to.
* **targetTopic**: The actual topic to write to Storage.
* **expression**: Condition to trigger routing (e.g., SpEL).

---

## 3. Filter (Filtering)

**Scope**: Both Publish (Upstream) & Subscribe (Downstream)

### A. Publish Side (Upstream)
**Key**: `filter-{producerGroup}`
**Effect**: Intercepts messages **before** they are sent to Storage.

### B. Subscribe Side (Downstream)
**Key**: `filter-{consumerGroup}`
**Effect**: Intercepts messages **before** they are pushed to the Consumer.

### Configuration Example (JSON)

```json
[
{
"topic": "test-topic",
"filterPattern": {
"source": ["app-a", "app-b"],
"type": [{"prefix": "com.example"}]
}
}
]
```

* **filterPattern**: Rules matching CloudEvent attributes. If a message doesn't match, it is dropped.

---

## 4. Transformer (Transformation)

**Scope**: Both Publish (Upstream) & Subscribe (Downstream)

### A. Publish Side (Upstream)
**Key**: `transformer-{producerGroup}`
**Effect**: Modifies message content **before** sending to Storage.

### B. Subscribe Side (Downstream)
**Key**: `transformer-{consumerGroup}`
**Effect**: Modifies message content **before** pushing to the Consumer.

### Configuration Example (JSON)

```json
[
{
"topic": "raw-topic",
"transformerConfig": {
"transformerType": "template",
"template": "{\"id\": \"${id}\", \"new_content\": \"${data.content}\"}"
}
}
]
```

* **transformerType**: e.g., `original`, `template`.
* **template**: The transformation template definition.

---

## 5. Verification

1. **Publish Config**: Add the JSON config to your Governance Center (e.g., Nacos) with the Data ID `router-MyGroup`.
2. **Send Message**: Use EventMesh SDK to send a message from `MyGroup`.
3. **Observe**:
* For **Router**: Check if the message appears in the `targetTopic` in your MQ.
* For **Filter**: Check if blocked messages are skipped.
* For **Transformer**: Check if the message body in MQ (for Pub) or Consumer (for Sub) is modified.
1 change: 1 addition & 0 deletions eventmesh-examples/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dependencies {
implementation project(":eventmesh-sdks:eventmesh-sdk-java")
implementation project(":eventmesh-common")
implementation project(":eventmesh-storage-plugin:eventmesh-storage-api")
implementation project(":eventmesh-protocol-plugin:eventmesh-protocol-a2a")
implementation project(":eventmesh-connectors:eventmesh-connector-spring")
implementation('org.springframework.boot:spring-boot-starter-web') {
exclude group: 'org.springframework.boot', module: 'spring-boot-starter-logging'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.a2a.demo;

import org.apache.eventmesh.a2a.demo.A2AAbstractDemo;
import org.apache.eventmesh.client.http.conf.EventMeshHttpClientConfig;
import org.apache.eventmesh.client.http.producer.EventMeshHttpProducer;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.ExampleConstants;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.protocol.a2a.A2AProtocolConstants;
import org.apache.eventmesh.protocol.a2a.AgentIdentity;
import org.apache.eventmesh.protocol.a2a.model.AgentCapabilities;
import org.apache.eventmesh.protocol.a2a.model.AgentCard;
import org.apache.eventmesh.protocol.a2a.model.AgentInterface;
import org.apache.eventmesh.protocol.a2a.model.AgentProvider;
import org.apache.eventmesh.protocol.a2a.model.AgentSkill;

import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;

import lombok.extern.slf4j.Slf4j;

/**
* Demo showing A2A Agent Card registration, discovery, and deletion via EventMesh.
*/
@Slf4j
public class AgentCardDemo extends A2AAbstractDemo {

public static void main(String[] args) throws Exception {
EventMeshHttpClientConfig config = initEventMeshHttpClientConfig("a2a-agent-card-demo");
try (EventMeshHttpProducer producer = new EventMeshHttpProducer(config)) {

// 1. Register an Agent Card
registerAgentCard(producer, "my.org", "my.unit", "weather-agent");

// 2. List Agent Cards
listAgentCards(producer);

// 3. Get specific Agent Card
getAgentCard(producer, "my.org", "my.unit", "weather-agent");

// 4. Delete Agent Card
deleteAgentCard(producer, "my.org", "my.unit", "weather-agent");

log.info("AgentCardDemo completed.");
}
}

private static void registerAgentCard(EventMeshHttpProducer producer,
String orgId, String unitId, String agentId) throws Exception {
AgentCard card = buildSampleCard(agentId);

Map<String, Object> params = new HashMap<>();
params.put("org_id", orgId);
params.put("unit_id", unitId);
params.put("agent_id", agentId);
params.put("card", card);

Map<String, Object> request = new HashMap<>();
request.put("jsonrpc", "2.0");
request.put("method", A2AProtocolConstants.OP_REGISTER_AGENT_CARD);
request.put("params", params);
request.put("id", UUID.randomUUID().toString());

CloudEvent event = buildA2ACardEvent(request);
producer.publish(event);
log.info("Registered agent card: {}/{}/{}", orgId, unitId, agentId);
}

private static void listAgentCards(EventMeshHttpProducer producer) throws Exception {
Map<String, Object> params = new HashMap<>();
params.put("org_id", "my.org");

Map<String, Object> request = new HashMap<>();
request.put("jsonrpc", "2.0");
request.put("method", A2AProtocolConstants.OP_LIST_AGENT_CARDS);
request.put("params", params);
request.put("id", UUID.randomUUID().toString());

CloudEvent event = buildA2ACardEvent(request);
producer.publish(event);
log.info("Listed agent cards for org: my.org");
}

private static void getAgentCard(EventMeshHttpProducer producer,
String orgId, String unitId, String agentId) throws Exception {
Map<String, Object> params = new HashMap<>();
params.put("org_id", orgId);
params.put("unit_id", unitId);
params.put("agent_id", agentId);

Map<String, Object> request = new HashMap<>();
request.put("jsonrpc", "2.0");
request.put("method", A2AProtocolConstants.OP_GET_AGENT_CARD);
request.put("params", params);
request.put("id", UUID.randomUUID().toString());

CloudEvent event = buildA2ACardEvent(request);
producer.publish(event);
log.info("Got agent card: {}/{}/{}", orgId, unitId, agentId);
}

private static void deleteAgentCard(EventMeshHttpProducer producer,
String orgId, String unitId, String agentId) throws Exception {
Map<String, Object> params = new HashMap<>();
params.put("org_id", orgId);
params.put("unit_id", unitId);
params.put("agent_id", agentId);

Map<String, Object> request = new HashMap<>();
request.put("jsonrpc", "2.0");
request.put("method", A2AProtocolConstants.OP_DELETE_AGENT_CARD);
request.put("params", params);
request.put("id", UUID.randomUUID().toString());

CloudEvent event = buildA2ACardEvent(request);
producer.publish(event);
log.info("Deleted agent card: {}/{}/{}", orgId, unitId, agentId);
}

private static AgentCard buildSampleCard(String agentId) {
AgentInterface iface = AgentInterface.builder()
.url("http://localhost:8080/a2a")
.protocolBinding("JSONRPC")
.protocolVersion(A2AProtocolConstants.PROTOCOL_VERSION)
.build();

AgentProvider provider = AgentProvider.builder()
.url("https://example.org")
.organization("Example Org")
.build();

AgentCapabilities capabilities = AgentCapabilities.builder()
.streaming(true)
.pushNotifications(true)
.build();

AgentSkill skill = AgentSkill.builder()
.id("weather-query")
.name("Weather Query")
.description("Queries weather information for a given location")
.tags(Arrays.asList("weather", "query"))
.examples(Arrays.asList("What's the weather in Beijing?"))
.build();

return AgentCard.builder()
.name(agentId)
.description("A weather query agent")
.version("1.0.0")
.supportedInterfaces(Collections.singletonList(iface))
.provider(provider)
.capabilities(capabilities)
.skills(Collections.singletonList(skill))
.defaultInputModes(Collections.singletonList("text/plain"))
.defaultOutputModes(Collections.singletonList("text/plain"))
.build();
}

private static CloudEvent buildA2ACardEvent(Map<String, Object> jsonRpcBody) {
String content = JsonUtils.toJSONString(jsonRpcBody);
String method = (String) jsonRpcBody.get("method");
String ceType = A2AProtocolConstants.CE_TYPE_PREFIX + method.replace("/", ".") + ".req";

return CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withSource(URI.create("a2a-agent-card-demo"))
.withDataContentType(ExampleConstants.CLOUDEVENT_CONTENT_TYPE)
.withType(ceType)
.withData(content.getBytes(StandardCharsets.UTF_8))
.withExtension(A2AProtocolConstants.CE_EXTENSION_PROTOCOL, "A2A")
.withExtension(A2AProtocolConstants.CE_EXTENSION_PROTOCOL_VERSION, "2.0")
.withExtension(A2AProtocolConstants.CE_EXTENSION_A2A_METHOD, method)
.withExtension(A2AProtocolConstants.CE_EXTENSION_MCP_TYPE, "request")
.withExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4_000))
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.function.router;

import org.apache.eventmesh.function.api.Router;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class RouterBuilderTest {

@Test
public void testBuild() {
String targetTopic = "targetTopic";
Router router = RouterBuilder.build(targetTopic);
Assertions.assertNotNull(router);
Assertions.assertEquals(targetTopic, router.route("{}"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ dependencies {
implementation "io.cloudevents:cloudevents-core"
implementation "io.cloudevents:cloudevents-json-jackson"
implementation "com.fasterxml.jackson.core:jackson-databind"
implementation "com.networknt:json-schema-validator:1.5.6"
implementation "org.slf4j:slf4j-api"

compileOnly 'org.projectlombok:lombok'
Expand Down
Loading