Skip to content

Commit e8182ae

Browse files
committed
feat: clean shutdown
Signed-off-by: Chris Gianelloni <[email protected]>
1 parent 779f0e2 commit e8182ae

File tree

7 files changed

+277
-40
lines changed

7 files changed

+277
-40
lines changed

connmanager/connection_manager.go

Lines changed: 60 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,11 @@
1515
package connmanager
1616

1717
import (
18+
"context"
19+
"errors"
1820
"io"
1921
"log/slog"
22+
"net"
2023
"sync"
2124

2225
"github.com/blinklabs-io/dingo/event"
@@ -36,8 +39,8 @@ const (
3639

3740
type connectionInfo struct {
3841
conn *ouroboros.Connection
39-
isInbound bool
4042
peerAddr string
43+
isInbound bool
4144
}
4245

4346
type peerConnectionState struct {
@@ -47,19 +50,22 @@ type peerConnectionState struct {
4750

4851
type ConnectionManager struct {
4952
connections map[ouroboros.ConnectionId]*connectionInfo
53+
metrics *connectionManagerMetrics
54+
listeners []net.Listener
5055
config ConnectionManagerConfig
5156
connectionsMutex sync.Mutex
52-
metrics *connectionManagerMetrics
57+
listenersMutex sync.Mutex
58+
closing bool
5359
}
5460

5561
type ConnectionManagerConfig struct {
62+
PromRegistry prometheus.Registerer
5663
Logger *slog.Logger
5764
EventBus *event.EventBus
5865
ConnClosedFunc ConnectionManagerConnClosedFunc
5966
Listeners []ListenerConfig
6067
OutboundConnOpts []ouroboros.ConnectionOptionFunc
6168
OutboundSourcePort uint
62-
PromRegistry prometheus.Registerer
6369
}
6470

6571
type connectionManagerMetrics struct {
@@ -211,6 +217,57 @@ func (c *ConnectionManager) Start() error {
211217
return nil
212218
}
213219

220+
func (c *ConnectionManager) Stop(ctx context.Context) error {
221+
var err error
222+
223+
c.config.Logger.Info("stopping connection manager")
224+
225+
// Mark closing to suppress accept-loop noise
226+
c.listenersMutex.Lock()
227+
c.closing = true
228+
c.listenersMutex.Unlock()
229+
230+
// Stop accepting new connections
231+
c.stopListeners()
232+
233+
// Close all existing connections gracefully
234+
c.connectionsMutex.Lock()
235+
conns := make([]*ouroboros.Connection, 0, len(c.connections))
236+
for _, info := range c.connections {
237+
conns = append(conns, info.conn)
238+
}
239+
c.connectionsMutex.Unlock()
240+
241+
// Close connections
242+
for _, conn := range conns {
243+
if conn != nil {
244+
if closeErr := conn.Close(); closeErr != nil {
245+
err = errors.Join(err, closeErr)
246+
}
247+
}
248+
}
249+
250+
c.config.Logger.Info("connection manager stopped")
251+
return err
252+
}
253+
254+
func (c *ConnectionManager) stopListeners() {
255+
c.listenersMutex.Lock()
256+
defer c.listenersMutex.Unlock()
257+
258+
for _, listener := range c.listeners {
259+
if listener != nil {
260+
if err := listener.Close(); err != nil {
261+
c.config.Logger.Warn(
262+
"error closing listener",
263+
"error", err,
264+
)
265+
}
266+
}
267+
}
268+
c.listeners = nil
269+
}
270+
214271
func (c *ConnectionManager) AddConnection(
215272
conn *ouroboros.Connection,
216273
isInbound bool,

connmanager/listener.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@ package connmanager
1616

1717
import (
1818
"context"
19+
"errors"
1920
"fmt"
2021
"net"
2122
"runtime"
23+
"strings"
2224

2325
"github.com/blinklabs-io/dingo/event"
2426
ouroboros "github.com/blinklabs-io/gouroboros"
@@ -81,6 +83,11 @@ func (c *ConnectionManager) startListener(l ListenerConfig) error {
8183
)
8284
}
8385
}
86+
// Track listener for shutdown
87+
c.listenersMutex.Lock()
88+
c.listeners = append(c.listeners, l.Listener)
89+
c.listenersMutex.Unlock()
90+
8491
// Build connection options
8592
defaultConnOpts := []ouroboros.ConnectionOptionFunc{
8693
ouroboros.WithLogger(c.config.Logger),
@@ -96,6 +103,29 @@ func (c *ConnectionManager) startListener(l ListenerConfig) error {
96103
// Accept connection
97104
conn, err := l.Listener.Accept()
98105
if err != nil {
106+
// If we're closing, exit quietly
107+
c.listenersMutex.Lock()
108+
isClosing := c.closing
109+
c.listenersMutex.Unlock()
110+
if isClosing {
111+
c.config.Logger.Debug("listener: shutting down, stopping accept loop")
112+
return
113+
}
114+
// During shutdown, closing the listener will cause Accept to return
115+
// a net.ErrClosed. Treat this as a normal termination and exit the loop
116+
if errors.Is(err, net.ErrClosed) || strings.Contains(err.Error(), "use of closed network connection") {
117+
c.config.Logger.Debug("listener: closed, stopping accept loop")
118+
return
119+
}
120+
// Some platforms may return temporary errors; handle and continue
121+
var ne net.Error
122+
if errors.As(err, &ne) && ne.Temporary() {
123+
c.config.Logger.Warn(
124+
fmt.Sprintf("listener: temporary accept error: %s", err),
125+
)
126+
continue
127+
}
128+
// Otherwise, log at error level and continue
99129
c.config.Logger.Error(
100130
fmt.Sprintf("listener: accept failed: %s", err),
101131
)

internal/integration/benchmark_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,12 +99,12 @@ func loadBlockData(numBlocks int) ([][]byte, error) {
9999

100100
// getTestBackends returns a slice of test backends for benchmarking
101101
func getTestBackends(b *testing.B, diskDataDir string) []struct {
102-
name string
103102
config *database.Config
103+
name string
104104
} {
105105
backends := []struct {
106-
name string
107106
config *database.Config
107+
name string
108108
}{
109109
{
110110
name: "memory",
@@ -133,8 +133,8 @@ func getTestBackends(b *testing.B, diskDataDir string) []struct {
133133
// Use path prefix for isolation instead of unique bucket names
134134
testPrefix := strings.ReplaceAll(b.Name(), "/", "-")
135135
backends = append(backends, struct {
136-
name string
137136
config *database.Config
137+
name string
138138
}{
139139
name: "GCS",
140140
config: &database.Config{
@@ -153,8 +153,8 @@ func getTestBackends(b *testing.B, diskDataDir string) []struct {
153153
// Use path prefix for isolation instead of unique bucket names
154154
testPrefix := strings.ReplaceAll(b.Name(), "/", "-")
155155
backends = append(backends, struct {
156-
name string
157156
config *database.Config
157+
name string
158158
}{
159159
name: "S3",
160160
config: &database.Config{

internal/node/node.go

Lines changed: 53 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -142,16 +142,17 @@ func Run(cfg *config.Config, logger *slog.Logger) error {
142142
"component",
143143
"node",
144144
)
145+
metricsServer := &http.Server{
146+
Addr: fmt.Sprintf(
147+
"%s:%d",
148+
cfg.BindAddr,
149+
cfg.MetricsPort,
150+
),
151+
ReadHeaderTimeout: 60 * time.Second,
152+
}
145153
go func() {
146-
debugger := &http.Server{
147-
Addr: fmt.Sprintf(
148-
"%s:%d",
149-
cfg.BindAddr,
150-
cfg.MetricsPort,
151-
),
152-
ReadHeaderTimeout: 60 * time.Second,
153-
}
154-
if err := debugger.ListenAndServe(); err != nil {
154+
if err := metricsServer.ListenAndServe(); err != nil &&
155+
err != http.ErrServerClosed {
155156
logger.Error(
156157
fmt.Sprintf("failed to start metrics listener: %s", err),
157158
"component", "node",
@@ -166,21 +167,52 @@ func Run(cfg *config.Config, logger *slog.Logger) error {
166167
syscall.SIGTERM,
167168
)
168169
defer signalCtxStop()
170+
171+
// Run node in goroutine
172+
errChan := make(chan error, 1)
169173
go func() {
170-
<-signalCtx.Done()
171-
logger.Info("signal received, shutting down")
172-
if err := d.Stop(); err != nil { //nolint:contextcheck
173-
logger.Error(
174-
"failure(s) while shutting down",
175-
"error",
176-
err,
177-
)
174+
if err := d.Run(); err != nil {
175+
errChan <- err
178176
}
179-
os.Exit(0)
180177
}()
181-
// Run node
182-
if err := d.Run(); err != nil {
178+
179+
// Wait for signal or error
180+
select {
181+
case <-signalCtx.Done():
182+
logger.Info("signal received, initiating graceful shutdown")
183+
184+
// Shutdown metrics server
185+
shutdownCtx, cancel := context.WithTimeout(
186+
context.Background(),
187+
5*time.Second,
188+
)
189+
if err := metricsServer.Shutdown(shutdownCtx); err != nil {
190+
logger.Error("metrics server shutdown error", "error", err)
191+
}
192+
cancel()
193+
194+
// Shutdown node
195+
if err := d.Stop(); err != nil {
196+
logger.Error("shutdown errors occurred", "error", err)
197+
return err
198+
}
199+
logger.Info("shutdown complete")
200+
return nil
201+
202+
case err := <-errChan:
203+
logger.Error("node error", "error", err)
204+
signalCtxStop()
205+
206+
// Cleanup on error
207+
shutdownCtx, cancel := context.WithTimeout(
208+
context.Background(),
209+
5*time.Second,
210+
)
211+
if shutdownErr := metricsServer.Shutdown(shutdownCtx); shutdownErr != nil {
212+
logger.Error("metrics server shutdown error", "error", shutdownErr)
213+
}
214+
cancel()
215+
183216
return err
184217
}
185-
return nil
186218
}

mempool/mempool.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package mempool
1616

1717
import (
18+
"context"
1819
"fmt"
1920
"io"
2021
"log/slog"
@@ -145,6 +146,28 @@ func (m *Mempool) RemoveConsumer(connId ouroboros.ConnectionId) {
145146
m.consumersMutex.Unlock()
146147
}
147148

149+
func (m *Mempool) Stop(ctx context.Context) error {
150+
m.logger.Debug("stopping mempool")
151+
152+
// Stop all consumers
153+
m.consumersMutex.Lock()
154+
for _, consumer := range m.consumers {
155+
if consumer != nil {
156+
consumer.ClearCache()
157+
}
158+
}
159+
m.consumers = make(map[ouroboros.ConnectionId]*MempoolConsumer)
160+
m.consumersMutex.Unlock()
161+
162+
// Clear transactions
163+
m.Lock()
164+
m.transactions = nil
165+
m.Unlock()
166+
167+
m.logger.Debug("mempool stopped")
168+
return nil
169+
}
170+
148171
func (m *Mempool) Consumer(connId ouroboros.ConnectionId) *MempoolConsumer {
149172
m.consumersMutex.Lock()
150173
defer m.consumersMutex.Unlock()

0 commit comments

Comments
 (0)