Skip to content

[ISSUE #5249]Refactor/unified runtime pipeline and protocol processors#5250

Open
qqeasonchen wants to merge 8 commits intoapache:masterfrom
qqeasonchen:refactor/unified-runtime-pipeline
Open

[ISSUE #5249]Refactor/unified runtime pipeline and protocol processors#5250
qqeasonchen wants to merge 8 commits intoapache:masterfrom
qqeasonchen:refactor/unified-runtime-pipeline

Conversation

@qqeasonchen
Copy link
Copy Markdown
Contributor

Description

#5249 This PR consolidates EventMesh's fragmented runtime architecture into a single unified runtime with standardized processing pipelines.

Commit 1: Unify Connector, Function, and Core Runtime into eventmesh-runtime

  • Removed the standalone eventmesh-runtime-v2 module entirely (33 files, ~1700 lines deleted), including its separate runtime lifecycle, connector/function managers, health/monitor/status services, meta storage, and configuration
  • Moved core abstractions (Router, RouterBuilder, ConnectorEventPublisher, SourceWorker) into eventmesh-runtime and eventmesh-function modules
  • Added RouterEngine, FilterEngine, TransformerEngine as core processing engines in eventmesh-runtime
  • Added EventMeshConnectorBootstrap to manage connector lifecycle within the unified runtime
  • Refactored EventMeshServer to integrate engines and A2A service initialization
  • Refactored ClientGroupWrapper (TCP) to delegate to the new engines

Commit 2: Unified Ingress/Egress Pipelines

  • Introduced IngressProcessor — a centralized pipeline implementing Filter -> Transformer -> Router for incoming messages
  • Introduced EgressProcessor — a centralized pipeline implementing Filter -> Transformer for outgoing messages
  • Refactored ClientGroupWrapper (TCP SDK) to use IngressProcessor and EgressProcessor
  • Refactored EventMeshConnectorBootstrap (Connectors) to use the new processors
  • Updated SinkWorker to support embedded mode for unified runtime execution

Commit 3: Complete HTTP and gRPC Processor Migration

  • HTTP Processors (5): Migrated SendAsyncMessageProcessor, SendSyncMessageProcessor, BatchSendMessageProcessor, BatchSendMessageV2Processor, and SendAsyncEventProcessor to use IngressProcessor
  • gRPC Processors (3): Migrated PublishCloudEventsProcessor, BatchPublishCloudEventProcessor, and RequestCloudEventProcessor with bidirectional pipeline support
  • Added BatchProcessResult — utility class for tracking success/filtered/failed counts
  • Added comprehensive tests: IngressProcessorTest, EgressProcessorTest, BatchProcessResultTest

Key Changes

File Action
eventmesh-runtime-v2/ (entire module) Deleted
eventmesh-runtime/.../core/protocol/IngressProcessor.java Added
eventmesh-runtime/.../core/protocol/EgressProcessor.java Added
eventmesh-runtime/.../core/protocol/BatchProcessResult.java Added
eventmesh-runtime/.../boot/RouterEngine.java Added
eventmesh-runtime/.../boot/EventMeshConnectorBootstrap.java Added
All HTTP/gRPC processors Modified

Test Plan

  • ./gradlew :eventmesh-runtime:test passes
  • All existing unit tests pass with no regressions

qqeasonchen and others added 8 commits December 24, 2025 11:08
… EgressProcessor

- Introduced IngressProcessor (Filter -> Transformer -> Router) and EgressProcessor (Filter -> Transformer) to centralize pipeline logic.
- Refactored ClientGroupWrapper (TCP SDK) to use these processors.
- Refactored EventMeshConnectorBootstrap (Connectors) to use these processors.
- Updated SinkWorker to support embedded mode for unified runtime execution.
- Updated Unified Runtime Design documentation to reflect architectural clarity.
…ine architecture

This commit completes the migration of all HTTP and gRPC protocol processors to use
the unified IngressProcessor and EgressProcessor pipeline architecture, ensuring
consistent Filter-Transformer-Router processing across all protocols.

Changes:
- HTTP Processors (5): Migrated SendAsyncMessageProcessor, SendSyncMessageProcessor,
  BatchSendMessageProcessor, BatchSendMessageV2Processor, and refactored
  SendAsyncEventProcessor to use IngressProcessor
- gRPC Processors (3): Migrated PublishCloudEventsProcessor,
  BatchPublishCloudEventProcessor, and RequestCloudEventProcessor with bidirectional
  pipeline support (Ingress for requests, Egress for responses)
- Added BatchProcessResult utility class to track success/filtered/failed counts for
  batch processing with detailed statistics
- Added comprehensive unit tests: IngressProcessorTest, EgressProcessorTest,
  BatchProcessResultTest, and enhanced SendAsyncEventProcessorTest
- Added IngressProcessor and EgressProcessor getters to EventMeshServer and
  EventMeshGrpcServer for cross-module access
- Updated design documentation (unified-runtime-design.md) to reflect the new
  architecture and migration status
- Updated configuration guide (core-engines-configuration.md) with pipeline key format

Architecture improvements:
- Unified pipeline key format: {producerGroup}-{topic}
- Consistent filter behavior: filtered messages return SUCCESS status
  (except request-reply returns error)
- Router support: topic changes tracked with finalTopic variable
- Batch statistics: detailed success/filtered/failed counts with message IDs
- Bidirectional processing: RequestCloudEventProcessor applies Ingress to requests
  and Egress to responses

All tests pass with no regressions.
@qqeasonchen qqeasonchen requested a review from Copilot May 6, 2026 13:56
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants