Skip to content

Comments

Move the NodeStream server to internal/stream#271

Open
meling wants to merge 21 commits intomasterfrom
meling/269/move-server-to-stream-pkg
Open

Move the NodeStream server to internal/stream#271
meling wants to merge 21 commits intomasterfrom
meling/269/move-server-to-stream-pkg

Conversation

@meling
Copy link
Member

@meling meling commented Feb 20, 2026

Move NodeStream handler to internal/stream

Motivation

As part of the ongoing refactor to consolidate stream infrastructure in
internal/stream, this PR moves the gRPC NodeStream implementation out
of the top-level gorums package and into a new internal/stream.Server
type.

Changes

New file: internal/stream/server.go

Introduces stream.Server, which implements the gRPC GorumsServer
interface. It owns the per-connection receive/send loop and mutex-based
request ordering, and dispatches each incoming message to a registered
MessageHandler callback:

type MessageHandler func(ctx context.Context, mut *sync.Mutex, finished chan<- *Message, msg *Message)

The callback design decouples NodeStream from gorums.Message, keeping
the stream package free of any dependency on gorums-level types.

Modified: server.go

  • Removes streamServer, newStreamServer(), and NodeStream().
  • Server.srv changes from *streamServer to *stream.Server.
  • RegisterHandler injects a MessageHandler closure that handles
    ServerCtx creation, request unmarshaling, gorums.Message wrapping,
    interceptor chaining, and response dispatch.

Design

The boundary is a callback: internal/stream owns the transport loop;
gorums owns the application-layer message handling. This preserves the
public API (gorums.Server, gorums.ServerCtx, gorums.Interceptor,
gorums.Handler, gorums.Message) with no changes to generated code or
user-facing types.

Testing

All existing tests pass. The two pre-existing failures in
TestChannelEnsureStream and TestChannelStreamReadyAfterReconnect are
unrelated to this change.

Fixes #269

This commit moves the core stream management logic from the `gorums`
package into a new `internal/stream` package to improve encapsulation
and testability.

Key changes:
- Moved channel.go to `internal/stream` containing `Channel`, `Request`,
  and `Message` types along with their associated logic.
- `Channel` now encapsulates the `grpc.ClientStream` lifecycle,
  handling connection establishment, reconnection, and message routing.
- The `Node` struct delegates underlying communication and metrics
  (Latency, LastErr) to `Channel`.
- Refactored `sender` and `receiver` loops in `Channel` for better
  robustness and to properly drain the send queue on closure, fixing
  potential deadlocks.
- Updated `channel_test.go` with robust, table-driven tests covering
  connection states, context cancellation, and concurrent sends.
- Added benchmarks for channel performance and stream reconnection.
The UnconnectedNodeHasNoStream case failed because the test helper
newChannelWithoutStream would reuse an already Ready connection from the
setupChannel, causing ensureStream to succeed when the test case
initialization expected it to fail. We fixed this by using the
setupChannelWithoutServer test helper instead, which does not create a
server connection initially.
Previously, Enqueue() registered each request in responseRouters before it
was sent on the stream. This meant cancelPendingMsgs() could cancel requests
that were still on sendQ or were waiting for a stream to become available,
i.e., requests that had never been in-flight at all.

Move registration to sendMsg(), immediately before stream.Send(), so the
router only holds requests that are being actively transmitted. Introduce
replyError() to deliver errors to requests that are not yet registered:
send-queue drain on close, ensureStream failures, and Enqueue failures.

Update sender() to use routeResponse() after a sendMsg error, since the
request is now registered in the router at that point and routeResponse
handles map cleanup correctly.

In channel_test.go, add waitForDisconnection() and rewrite
TestChannelStreamReadyAfterReconnect to drive the stream break via the
server (breakStreamServer) rather than calling clearStream() directly. The
old test raced with the receiver goroutine's own clearStream() call, causing
the test to hang waiting on the streamReady channel.

Remaining issue: cancelPendingMsgs is still too coarse. If sendMsg's own
clearStream() (on stream.Send failure) clears S1 and the sender immediately
creates S2 and registers a new request, the receiver's subsequent
cancelPendingMsgs for S1's Recv error will spuriously cancel that request.
The receiver's clearStream() may also kill the healthy S2. A follow-up
commit will provide fix for this issue as well.
This commit addresses a critical race condition where the receiver could
cancel a newly established stream's context when calling clearStream on a
stale stream reference. It also introduces automatic retry for non-streaming
requests when the stream breaks.

Key changes:

- clearStream now takes a stale stream parameter and only clears if it
  matches the current stream, preventing spurious cancellation of new streams
- Added requeuePendingMsgs to automatically requeue non-streaming requests
  for retry on the next stream, improving resilience
- Streaming requests are still cancelled since they are bound to a specific
  stream and cannot be meaningfully retried
- Added TestChannelConcurrentStreamReconnect to validate correct handling of
  concurrent requests during stream reconnection

This ensures requests sent on the current stream are never spuriously
cancelled by cleanup of a stale stream, and non-streaming requests are
automatically retried when the stream breaks.
This reuses Enqueue() to requeue messages sent on the old stream, so
that context cancellation can cancel such messages.
This avoids holding the lock when sending on the ResponseChan channel,
either as an actual response or due to cancellation of messages.
This avoids a select call at the end of every receiver() goroutine
loop iteration. Instead we only break out if the stream.Recv() call
failed with a context cancelled error.
The goroutine that monitored request context cancellation during
stream.Send() was defensive but unnecessary. It attempted to trigger
reconnection via clearStream when a request context was cancelled
mid-send, but this is redundant because:

1. If stream.Send() fails for any reason (including timeout/cancel),
   it already calls clearStream to handle the failure
2. If the caller is impatient (short context timeout), triggering
   reconnection penalizes all other pending requests unnecessarily
3. Tests pass consistently without this logic (verified with -count=100)

This simplifies the sendMsg method and removes the overhead of
creating a channel and spawning a monitoring goroutine for every
send operation.
Move response routing logic for one-way calls from sendMsg to sender(),
eliminating the defer mechanism that captured named return values and
making the control flow more explicit and easier to follow.

Changes:
- Remove defer func() from sendMsg that routed responses
- Move response routing to sender() after sendMsg returns
- Simplify routing condition to: err != nil || req.WaitSendDone
  - Always deliver errors to caller
  - Only deliver success when WaitSendDone=true
- Make sendMsg focused solely on sending messages

This refactoring maintains the same semantics while improving code
clarity and separating concerns between sending (sendMsg) and
routing (sender).
This replaces the custom "break" first stream with the shared
breakStreamServer that will break every stream. This is not weaker than
the custom serverFn, but rather creates more chaos.
Refactor TestChannelContext to use the previously unused holdServer
function for "CancelDuringSend" test cases, replacing the arbitrary
3-second delay with delayServer.

Changes:
- Replace serverDelay field with serverFn in test cases
- Use echoServer for "CancelBeforeSend" tests (responds promptly)
- Use holdServer for "CancelDuringSend" tests (blocks indefinitely)

This makes test intent clearer: holdServer blocks until context
cancellation, which is semantically more accurate for testing
context timeout behavior during send operations.
This fixes lint issues raised by errcheck on Close() and Server()
calls; we add some checks for err and ignore some of the Close()
calls since they are done in cleanup.
clearStream held streamMut while calling requeuePendingMsgs, which
calls Enqueue and may block on a full sendQ. Concurrently, the
sender() goroutine dequeues from sendQ but then blocks in
ensureStream() waiting for streamMut, resulting in a deadlock.

The fix: move requeuePendingMsgs out of clearStream to the two
call sites (receiver and sendMsg), so streamMut is never held
across Enqueue calls.

Add TestChannelClearStreamDeadlock to reproduce and verify the fix.
Combining WaitSendDone=true with Streaming=true causes double delivery:
sender() routes a send-done response on send completion, but because
Streaming=true the router entry is kept alive, so the server's echo
response is routed a second time to the same ResponseChan.

Add a panic guard at the API boundary (Enqueue) since the combination
is never valid: WaitSendDone is for one-way calls (unicast/multicast)
and Streaming is for correctable calls.

Fold the panic case into TestChannelRouterLifecycle as a wantPanic field,
in the WaitSendDone/Streaming/Invalid subtest alongside the three valid
combinations.
Remove the sendMsg helper and fold its logic directly into the sender
goroutine loop. Each outcome is now an explicit continue or fall-through,
making the control flow self-documenting:

- ensureStream error, cancelled context, nil stream: replyError + continue
  (request never enters the router; no routeResponse lookup needed)
- Send failure: requeuePendingMsgs removes the registered entry; continue
  skips routeResponse since the entry is already gone
- Send success, WaitSendDone=true: routeResponse delivers the confirmation
  for one-way calls without IgnoreErrors(); (entry is guaranteed to exist)
- Send success, WaitSendDone=false: router entry stays alive for receiver()
  to deliver the actual server response (two-way calls)

The router registration is now guarded by all early-exit checks, so
responseRouters only ever holds requests that are genuinely in-flight on
the current stream. This eliminates orphan entries for cancelled or
stream-down requests and removes the previously "harmless" no-op
routeResponse calls on early-exit paths.
Extract the gRPC NodeStream implementation from the gorums package into
a new internal/stream.Server type, keeping only the public API surface
(Server, ServerCtx, Interceptor, Handler) in the gorums package.

Introduce a MessageHandler callback type in the stream package so that
NodeStream can remain decoupled from gorums.Message. The gorums package
wires this callback in RegisterHandler: it creates the ServerCtx, unmarshals
the request, wraps it in a gorums.Message, runs the interceptor chain,
and dispatches the response — all without the stream package needing to
know about gorums-level types.

stream/server.go adds:
- Server struct (implements GorumsServer via NodeStream)
- MessageHandler callback type
- NewServer(), RegisterHandler()

server.go removes:
- streamServer struct, newStreamServer(), NodeStream() (~80 lines)
- gorums.Server.srv changes from *streamServer to *stream.Server
- RegisterHandler now injects the gorums.Message wrapping callback
@deepsource-io
Copy link
Contributor

deepsource-io bot commented Feb 20, 2026

DeepSource Code Review

DeepSource reviewed changes in the commit range 5f902d5...ea6ac9e on this pull request. Below is the summary for the review, and you can see the individual issues we found as review comments.

For detailed review results, please see the PR on DeepSource ↗

PR Report Card

Security × 0 issues Overall PR Quality   

Reliability × 0 issues
Complexity × 0 issues
Hygiene × 0 issues

Code Review Summary

Analyzer Status Summary Details
Go No new issues detected. Review ↗
Shell No new issues detected. Review ↗
How are these analyzer statuses calculated?

Administrators can configure which issue categories are reported and cause analysis to be marked as failed when detected. This helps prevent bad and insecure code from being introduced in the codebase. If you're an administrator, you can modify this in the repository's settings.

@meling meling changed the base branch from master to meling/268/move-channel-to-stream-pkg February 20, 2026 07:49
Base automatically changed from meling/268/move-channel-to-stream-pkg to master February 21, 2026 17:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

chore: split server.go between the gorums package and the internal/stream package

1 participant