Move the NodeStream server to internal/stream#271
Open
Conversation
This commit moves the core stream management logic from the `gorums` package into a new `internal/stream` package to improve encapsulation and testability. Key changes: - Moved channel.go to `internal/stream` containing `Channel`, `Request`, and `Message` types along with their associated logic. - `Channel` now encapsulates the `grpc.ClientStream` lifecycle, handling connection establishment, reconnection, and message routing. - The `Node` struct delegates underlying communication and metrics (Latency, LastErr) to `Channel`. - Refactored `sender` and `receiver` loops in `Channel` for better robustness and to properly drain the send queue on closure, fixing potential deadlocks. - Updated `channel_test.go` with robust, table-driven tests covering connection states, context cancellation, and concurrent sends. - Added benchmarks for channel performance and stream reconnection.
The UnconnectedNodeHasNoStream case failed because the test helper newChannelWithoutStream would reuse an already Ready connection from the setupChannel, causing ensureStream to succeed when the test case initialization expected it to fail. We fixed this by using the setupChannelWithoutServer test helper instead, which does not create a server connection initially.
Previously, Enqueue() registered each request in responseRouters before it was sent on the stream. This meant cancelPendingMsgs() could cancel requests that were still on sendQ or were waiting for a stream to become available, i.e., requests that had never been in-flight at all. Move registration to sendMsg(), immediately before stream.Send(), so the router only holds requests that are being actively transmitted. Introduce replyError() to deliver errors to requests that are not yet registered: send-queue drain on close, ensureStream failures, and Enqueue failures. Update sender() to use routeResponse() after a sendMsg error, since the request is now registered in the router at that point and routeResponse handles map cleanup correctly. In channel_test.go, add waitForDisconnection() and rewrite TestChannelStreamReadyAfterReconnect to drive the stream break via the server (breakStreamServer) rather than calling clearStream() directly. The old test raced with the receiver goroutine's own clearStream() call, causing the test to hang waiting on the streamReady channel. Remaining issue: cancelPendingMsgs is still too coarse. If sendMsg's own clearStream() (on stream.Send failure) clears S1 and the sender immediately creates S2 and registers a new request, the receiver's subsequent cancelPendingMsgs for S1's Recv error will spuriously cancel that request. The receiver's clearStream() may also kill the healthy S2. A follow-up commit will provide fix for this issue as well.
This commit addresses a critical race condition where the receiver could cancel a newly established stream's context when calling clearStream on a stale stream reference. It also introduces automatic retry for non-streaming requests when the stream breaks. Key changes: - clearStream now takes a stale stream parameter and only clears if it matches the current stream, preventing spurious cancellation of new streams - Added requeuePendingMsgs to automatically requeue non-streaming requests for retry on the next stream, improving resilience - Streaming requests are still cancelled since they are bound to a specific stream and cannot be meaningfully retried - Added TestChannelConcurrentStreamReconnect to validate correct handling of concurrent requests during stream reconnection This ensures requests sent on the current stream are never spuriously cancelled by cleanup of a stale stream, and non-streaming requests are automatically retried when the stream breaks.
This reuses Enqueue() to requeue messages sent on the old stream, so that context cancellation can cancel such messages.
This avoids holding the lock when sending on the ResponseChan channel, either as an actual response or due to cancellation of messages.
This avoids a select call at the end of every receiver() goroutine loop iteration. Instead we only break out if the stream.Recv() call failed with a context cancelled error.
The goroutine that monitored request context cancellation during stream.Send() was defensive but unnecessary. It attempted to trigger reconnection via clearStream when a request context was cancelled mid-send, but this is redundant because: 1. If stream.Send() fails for any reason (including timeout/cancel), it already calls clearStream to handle the failure 2. If the caller is impatient (short context timeout), triggering reconnection penalizes all other pending requests unnecessarily 3. Tests pass consistently without this logic (verified with -count=100) This simplifies the sendMsg method and removes the overhead of creating a channel and spawning a monitoring goroutine for every send operation.
Move response routing logic for one-way calls from sendMsg to sender(), eliminating the defer mechanism that captured named return values and making the control flow more explicit and easier to follow. Changes: - Remove defer func() from sendMsg that routed responses - Move response routing to sender() after sendMsg returns - Simplify routing condition to: err != nil || req.WaitSendDone - Always deliver errors to caller - Only deliver success when WaitSendDone=true - Make sendMsg focused solely on sending messages This refactoring maintains the same semantics while improving code clarity and separating concerns between sending (sendMsg) and routing (sender).
This replaces the custom "break" first stream with the shared breakStreamServer that will break every stream. This is not weaker than the custom serverFn, but rather creates more chaos.
Refactor TestChannelContext to use the previously unused holdServer function for "CancelDuringSend" test cases, replacing the arbitrary 3-second delay with delayServer. Changes: - Replace serverDelay field with serverFn in test cases - Use echoServer for "CancelBeforeSend" tests (responds promptly) - Use holdServer for "CancelDuringSend" tests (blocks indefinitely) This makes test intent clearer: holdServer blocks until context cancellation, which is semantically more accurate for testing context timeout behavior during send operations.
This fixes lint issues raised by errcheck on Close() and Server() calls; we add some checks for err and ignore some of the Close() calls since they are done in cleanup.
clearStream held streamMut while calling requeuePendingMsgs, which calls Enqueue and may block on a full sendQ. Concurrently, the sender() goroutine dequeues from sendQ but then blocks in ensureStream() waiting for streamMut, resulting in a deadlock. The fix: move requeuePendingMsgs out of clearStream to the two call sites (receiver and sendMsg), so streamMut is never held across Enqueue calls. Add TestChannelClearStreamDeadlock to reproduce and verify the fix.
Combining WaitSendDone=true with Streaming=true causes double delivery: sender() routes a send-done response on send completion, but because Streaming=true the router entry is kept alive, so the server's echo response is routed a second time to the same ResponseChan. Add a panic guard at the API boundary (Enqueue) since the combination is never valid: WaitSendDone is for one-way calls (unicast/multicast) and Streaming is for correctable calls. Fold the panic case into TestChannelRouterLifecycle as a wantPanic field, in the WaitSendDone/Streaming/Invalid subtest alongside the three valid combinations.
Remove the sendMsg helper and fold its logic directly into the sender goroutine loop. Each outcome is now an explicit continue or fall-through, making the control flow self-documenting: - ensureStream error, cancelled context, nil stream: replyError + continue (request never enters the router; no routeResponse lookup needed) - Send failure: requeuePendingMsgs removes the registered entry; continue skips routeResponse since the entry is already gone - Send success, WaitSendDone=true: routeResponse delivers the confirmation for one-way calls without IgnoreErrors(); (entry is guaranteed to exist) - Send success, WaitSendDone=false: router entry stays alive for receiver() to deliver the actual server response (two-way calls) The router registration is now guarded by all early-exit checks, so responseRouters only ever holds requests that are genuinely in-flight on the current stream. This eliminates orphan entries for cancelled or stream-down requests and removes the previously "harmless" no-op routeResponse calls on early-exit paths.
Extract the gRPC NodeStream implementation from the gorums package into a new internal/stream.Server type, keeping only the public API surface (Server, ServerCtx, Interceptor, Handler) in the gorums package. Introduce a MessageHandler callback type in the stream package so that NodeStream can remain decoupled from gorums.Message. The gorums package wires this callback in RegisterHandler: it creates the ServerCtx, unmarshals the request, wraps it in a gorums.Message, runs the interceptor chain, and dispatches the response — all without the stream package needing to know about gorums-level types. stream/server.go adds: - Server struct (implements GorumsServer via NodeStream) - MessageHandler callback type - NewServer(), RegisterHandler() server.go removes: - streamServer struct, newStreamServer(), NodeStream() (~80 lines) - gorums.Server.srv changes from *streamServer to *stream.Server - RegisterHandler now injects the gorums.Message wrapping callback
Contributor
DeepSource reviewed changes in the commit range For detailed review results, please see the PR on DeepSource ↗ PR Report Card
Code Review Summary
How are these analyzer statuses calculated?Administrators can configure which issue categories are reported and cause analysis to be marked as failed when detected. This helps prevent bad and insecure code from being introduced in the codebase. If you're an administrator, you can modify this in the repository's settings. |
Base automatically changed from
meling/268/move-channel-to-stream-pkg
to
master
February 21, 2026 17:02
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Move
NodeStreamhandler tointernal/streamMotivation
As part of the ongoing refactor to consolidate stream infrastructure in
internal/stream, this PR moves the gRPCNodeStreamimplementation outof the top-level
gorumspackage and into a newinternal/stream.Servertype.
Changes
New file:
internal/stream/server.goIntroduces
stream.Server, which implements the gRPCGorumsServerinterface. It owns the per-connection receive/send loop and mutex-based
request ordering, and dispatches each incoming message to a registered
MessageHandlercallback:The callback design decouples
NodeStreamfromgorums.Message, keepingthe stream package free of any dependency on gorums-level types.
Modified:
server.gostreamServer,newStreamServer(), andNodeStream().Server.srvchanges from*streamServerto*stream.Server.RegisterHandlerinjects aMessageHandlerclosure that handlesServerCtxcreation, request unmarshaling,gorums.Messagewrapping,interceptor chaining, and response dispatch.
Design
The boundary is a callback:
internal/streamowns the transport loop;gorumsowns the application-layer message handling. This preserves thepublic API (
gorums.Server,gorums.ServerCtx,gorums.Interceptor,gorums.Handler,gorums.Message) with no changes to generated code oruser-facing types.
Testing
All existing tests pass. The two pre-existing failures in
TestChannelEnsureStreamandTestChannelStreamReadyAfterReconnectareunrelated to this change.
Fixes #269