Skip to content

Commit 38e105c

Browse files
committed
refactor(internal/stream): move NodeStream handler to stream.Server
Extract the gRPC NodeStream implementation from the gorums package into a new internal/stream.Server type, keeping only the public API surface (Server, ServerCtx, Interceptor, Handler) in the gorums package. Introduce a MessageHandler callback type in the stream package so that NodeStream can remain decoupled from gorums.Message. The gorums package wires this callback in RegisterHandler: it creates the ServerCtx, unmarshals the request, wraps it in a gorums.Message, runs the interceptor chain, and dispatches the response — all without the stream package needing to know about gorums-level types. stream/server.go adds: - Server struct (implements GorumsServer via NodeStream) - MessageHandler callback type - NewServer(), RegisterHandler() server.go removes: - streamServer struct, newStreamServer(), NodeStream() (~80 lines) - gorums.Server.srv changes from *streamServer to *stream.Server - RegisterHandler now injects the gorums.Message wrapping callback
1 parent 5f902d5 commit 38e105c

2 files changed

Lines changed: 106 additions & 84 deletions

File tree

internal/stream/server.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package stream
2+
3+
import (
4+
"context"
5+
"sync"
6+
)
7+
8+
// MessageHandler handles an incoming stream message on the server side.
9+
// It is called in a new goroutine for each received message.
10+
// The handler must eventually release the mutex (by calling mut.Unlock())
11+
// to allow the next request to be processed. The finished channel is used
12+
// to send response messages back to the client.
13+
type MessageHandler func(ctx context.Context, mut *sync.Mutex, finished chan<- *Message, msg *Message)
14+
15+
// Server implements the Gorums gRPC service for handling node streams.
16+
type Server struct {
17+
handlers map[string]MessageHandler
18+
buffer uint
19+
connectCallback func(context.Context)
20+
UnimplementedGorumsServer
21+
}
22+
23+
// NewServer creates a new StreamServer with the given buffer size
24+
// and optional connect callback.
25+
func NewServer(buffer uint, connectCallback func(context.Context)) *Server {
26+
return &Server{
27+
handlers: make(map[string]MessageHandler),
28+
buffer: buffer,
29+
connectCallback: connectCallback,
30+
}
31+
}
32+
33+
// RegisterHandler registers a message handler for the specified method name.
34+
func (s *Server) RegisterHandler(method string, handler MessageHandler) {
35+
s.handlers[method] = handler
36+
}
37+
38+
// NodeStream handles a connection to a single client. The stream is aborted if there
39+
// is any error with sending or receiving.
40+
func (s *Server) NodeStream(srv Gorums_NodeStreamServer) error {
41+
var mut sync.Mutex
42+
finished := make(chan *Message, s.buffer)
43+
ctx := srv.Context()
44+
45+
if s.connectCallback != nil {
46+
s.connectCallback(ctx)
47+
}
48+
49+
go func() {
50+
for {
51+
select {
52+
case <-ctx.Done():
53+
return
54+
case streamOut := <-finished:
55+
if err := srv.Send(streamOut); err != nil {
56+
return
57+
}
58+
}
59+
}
60+
}()
61+
62+
// Start with a locked mutex
63+
mut.Lock()
64+
defer mut.Unlock()
65+
66+
for {
67+
streamIn, err := srv.Recv()
68+
if err != nil {
69+
return err
70+
}
71+
if handler, ok := s.handlers[streamIn.GetMethod()]; ok {
72+
// We start the handler in a new goroutine in order to allow multiple
73+
// handlers to run concurrently. However, to preserve request ordering,
74+
// the handler must unlock the shared mutex when it has either finished,
75+
// or when it is safe to start processing the next request.
76+
go handler(ctx, &mut, finished, streamIn)
77+
// Wait until the handler releases the mutex.
78+
mut.Lock()
79+
}
80+
}
81+
}

server.go

Lines changed: 25 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -20,87 +20,6 @@ type (
2020
Handler func(ServerCtx, *Message) (*Message, error)
2121
)
2222

23-
type streamServer struct {
24-
handlers map[string]Handler
25-
opts *serverOptions
26-
stream.UnimplementedGorumsServer
27-
}
28-
29-
func newStreamServer(opts *serverOptions) *streamServer {
30-
return &streamServer{
31-
handlers: make(map[string]Handler),
32-
opts: opts,
33-
}
34-
}
35-
36-
// NodeStream handles a connection to a single client. The stream is aborted if there
37-
// is any error with sending or receiving.
38-
func (s *streamServer) NodeStream(srv stream.Gorums_NodeStreamServer) error {
39-
var mut sync.Mutex // used to achieve mutex between request handlers
40-
finished := make(chan *stream.Message, s.opts.buffer)
41-
ctx := srv.Context()
42-
43-
if s.opts.connectCallback != nil {
44-
s.opts.connectCallback(ctx)
45-
}
46-
47-
go func() {
48-
for {
49-
select {
50-
case <-ctx.Done():
51-
return
52-
case streamOut := <-finished:
53-
if err := srv.Send(streamOut); err != nil {
54-
return
55-
}
56-
}
57-
}
58-
}()
59-
60-
// Start with a locked mutex
61-
mut.Lock()
62-
defer mut.Unlock()
63-
64-
for {
65-
streamIn, err := srv.Recv()
66-
if err != nil {
67-
return err
68-
}
69-
if handler, ok := s.handlers[streamIn.GetMethod()]; ok {
70-
// We start the handler in a new goroutine in order to allow multiple handlers to run concurrently.
71-
// However, to preserve request ordering, the handler must unlock the shared mutex when it has either
72-
// finished, or when it is safe to start processing the next request.
73-
//
74-
// This func() is the default interceptor; it is the first and last handler in the chain.
75-
// It is responsible for releasing the mutex when the handler chain is done.
76-
go func() {
77-
srvCtx := newServerCtx(streamIn.AppendToIncomingContext(ctx), &mut, finished)
78-
defer srvCtx.Release()
79-
80-
msg, err := stream.UnmarshalRequest(streamIn)
81-
in := &Message{Msg: msg, Message: streamIn}
82-
if err != nil {
83-
_ = srvCtx.SendMessage(messageWithError(in, nil, err))
84-
return
85-
}
86-
87-
out, err := handler(srvCtx, in)
88-
// If there is no response and no error, we do not send anything back to the client.
89-
// This corresponds to a unidirectional message from client to server, where clients
90-
// are not expected to receive a response.
91-
if out == nil && err == nil {
92-
return
93-
}
94-
_ = srvCtx.SendMessage(messageWithError(in, out, err))
95-
// We ignore the error from SendMessage here; it means that the stream is closed.
96-
// The for-loop above will exit on the next Recv call.
97-
}()
98-
// Wait until the handler releases the mutex.
99-
mut.Lock()
100-
}
101-
}
102-
}
103-
10423
type serverOptions struct {
10524
buffer uint
10625
grpcOpts []grpc.ServerOption
@@ -169,7 +88,7 @@ func chainInterceptors(final Handler, interceptors ...Interceptor) Handler {
16988

17089
// Server serves all ordering based RPCs using registered handlers.
17190
type Server struct {
172-
srv *streamServer
91+
srv *stream.Server
17392
grpcServer *grpc.Server
17493
interceptors []Interceptor
17594
}
@@ -181,7 +100,7 @@ func NewServer(opts ...ServerOption) *Server {
181100
opt(&serverOpts)
182101
}
183102
s := &Server{
184-
srv: newStreamServer(&serverOpts),
103+
srv: stream.NewServer(serverOpts.buffer, serverOpts.connectCallback),
185104
grpcServer: grpc.NewServer(serverOpts.grpcOpts...),
186105
interceptors: serverOpts.interceptors,
187106
}
@@ -193,7 +112,29 @@ func NewServer(opts ...ServerOption) *Server {
193112
//
194113
// This function should only be used by generated code.
195114
func (s *Server) RegisterHandler(method string, handler Handler) {
196-
s.srv.handlers[method] = chainInterceptors(handler, s.interceptors...)
115+
wrapped := chainInterceptors(handler, s.interceptors...)
116+
s.srv.RegisterHandler(method, func(ctx context.Context, mut *sync.Mutex, finished chan<- *stream.Message, streamIn *stream.Message) {
117+
srvCtx := newServerCtx(streamIn.AppendToIncomingContext(ctx), mut, finished)
118+
defer srvCtx.Release()
119+
120+
msg, err := stream.UnmarshalRequest(streamIn)
121+
in := &Message{Msg: msg, Message: streamIn}
122+
if err != nil {
123+
_ = srvCtx.SendMessage(messageWithError(in, nil, err))
124+
return
125+
}
126+
127+
out, err := wrapped(srvCtx, in)
128+
// If there is no response and no error, we do not send anything back to the client.
129+
// This corresponds to a unidirectional message from client to server, where clients
130+
// are not expected to receive a response.
131+
if out == nil && err == nil {
132+
return
133+
}
134+
_ = srvCtx.SendMessage(messageWithError(in, out, err))
135+
// We ignore the error from SendMessage here; it means that the stream is closed.
136+
// The for-loop in srv.NodeStream() will exit on the next Recv() call.
137+
})
197138
}
198139

199140
// Serve starts serving on the listener.

0 commit comments

Comments
 (0)