Skip to content

Commit 5c3389f

Browse files
authored
Merge pull request #851 from l1b0k/feat/healthcheck
refactor(terway-cli): replace socat with native Go TCP listener for h…
2 parents 2d385b9 + eae700f commit 5c3389f

File tree

2 files changed

+118
-13
lines changed

2 files changed

+118
-13
lines changed

cmd/terway-cli/policy.go

Lines changed: 74 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,23 @@
11
package main
22

33
import (
4+
"context"
45
"encoding/json"
56
"fmt"
7+
"net"
68
"os"
79
"os/exec"
810
"strings"
911
"syscall"
12+
"time"
1013

1114
"github.com/Jeffail/gabs/v2"
1215
"github.com/samber/lo"
1316
"github.com/spf13/cobra"
1417

1518
"github.com/AliyunContainerService/terway/pkg/utils/nodecap"
1619
"github.com/AliyunContainerService/terway/types"
20+
ctrl "sigs.k8s.io/controller-runtime/pkg/manager/signals"
1721
)
1822

1923
var readFunc func(name string) ([]byte, error)
@@ -113,10 +117,10 @@ func initPolicy(cmd *cobra.Command, args []string) error {
113117
if err != nil {
114118
return err
115119
}
116-
return runSocat(cfg)
120+
return runHealthCheckServer(ctrl.SetupSignalHandler(), cfg)
117121
}
118122
if !cfg.HasCiliumChainer {
119-
return runSocat(cfg)
123+
return runHealthCheckServer(ctrl.SetupSignalHandler(), cfg)
120124
}
121125
fmt.Printf("enable ebpf provider, run cilium")
122126
fallthrough
@@ -139,7 +143,7 @@ func runExclusiveENI(cfg *PolicyConfig) error {
139143
}
140144
}
141145

142-
return runSocat(cfg)
146+
return runHealthCheckServer(ctrl.SetupSignalHandler(), cfg)
143147
}
144148

145149
func runCalico(cfg *PolicyConfig) error {
@@ -357,22 +361,79 @@ func cleanUPFelix() error {
357361
return nil
358362
}
359363

360-
func runSocat(cfg *PolicyConfig) error {
364+
func runHealthCheckServer(ctx context.Context, cfg *PolicyConfig) error {
361365
port := cfg.HealthCheckPort
362366
if port == "" {
363367
port = "9099"
364368
}
365-
args := []string{
366-
"socat",
367-
fmt.Sprintf("TCP-LISTEN:%s,bind=127.0.0.1,fork,reuseaddr", port),
368-
"system:'sleep 2;kill -9 $SOCAT_PID 2>/dev/null'",
369-
}
370-
env := os.Environ()
371-
binary, err := exec.LookPath("socat")
369+
addr := "127.0.0.1:" + port
370+
371+
ln, err := net.Listen("tcp", addr)
372372
if err != nil {
373-
return fmt.Errorf("socat is not installed %w", err)
373+
return fmt.Errorf("failed to listen on %s: %w", addr, err)
374+
}
375+
defer ln.Close()
376+
377+
fmt.Printf("Health check TCP server started on %s\n", addr)
378+
379+
// Channel to signal server shutdown
380+
shutdownCh := make(chan struct{})
381+
382+
// Graceful shutdown handler
383+
go func() {
384+
<-ctx.Done()
385+
fmt.Printf("Shutting down health check server...\n")
386+
ln.Close()
387+
close(shutdownCh)
388+
}()
389+
390+
// Semaphore to limit concurrent connections
391+
const maxConcurrentConns = 100
392+
connSem := make(chan struct{}, maxConcurrentConns)
393+
394+
for {
395+
conn, err := ln.Accept()
396+
if err != nil {
397+
// Check if we're shutting down
398+
select {
399+
case <-shutdownCh:
400+
fmt.Printf("Health check server stopped\n")
401+
return nil
402+
default:
403+
}
404+
405+
// Handle network errors
406+
if ne, ok := err.(net.Error); ok {
407+
if ne.Timeout() {
408+
fmt.Fprintf(os.Stderr, "accept timeout: %v\n", err)
409+
continue
410+
}
411+
}
412+
413+
// For other persistent errors, log and continue
414+
_, _ = fmt.Fprintf(os.Stderr, "accept error: %v, continuing...\n", err)
415+
time.Sleep(100 * time.Millisecond) // Brief pause to avoid busy loop
416+
continue
417+
}
418+
419+
// Handle connection in goroutine with concurrency control
420+
go func(c net.Conn) {
421+
// Acquire semaphore
422+
connSem <- struct{}{}
423+
defer func() {
424+
<-connSem // Release semaphore
425+
c.Close()
426+
}()
427+
428+
// Set connection timeout
429+
_ = c.SetDeadline(time.Now().Add(5 * time.Second))
430+
431+
// For health checks, we typically just need to accept the connection
432+
// and close it immediately to indicate the service is healthy
433+
434+
_, _ = c.Write([]byte("OK\n"))
435+
}(conn)
374436
}
375-
return syscall.Exec(binary, args, env)
376437
}
377438

378439
func mutateCiliumArgs(in []string) ([]string, error) {

cmd/terway-cli/policy_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
package main
22

33
import (
4+
"bufio"
5+
"context"
46
"fmt"
7+
"net"
58
"os"
69
"testing"
10+
"time"
711

812
"github.com/Jeffail/gabs/v2"
913
"github.com/stretchr/testify/assert"
@@ -255,3 +259,43 @@ func Test_mutateCiliumArgs(t *testing.T) {
255259
})
256260
}
257261
}
262+
263+
func Test_runHealthCheckServer(t *testing.T) {
264+
cfg := &PolicyConfig{
265+
HealthCheckPort: "18080", // Avoid conflicts by choosing a test port
266+
}
267+
268+
ctx, cancel := context.WithCancel(context.Background())
269+
defer cancel()
270+
271+
// Start the server
272+
go func() {
273+
err := runHealthCheckServer(ctx, cfg)
274+
if err != nil {
275+
t.Errorf("runHealthCheckServer error: %v", err)
276+
}
277+
}()
278+
279+
// Wait for the server to start
280+
time.Sleep(200 * time.Millisecond)
281+
282+
// Connect to the server
283+
conn, err := net.Dial("tcp", "127.0.0.1:"+cfg.HealthCheckPort)
284+
if err != nil {
285+
t.Fatalf("failed to connect: %v", err)
286+
}
287+
defer conn.Close()
288+
289+
// Read the response content
290+
resp, err := bufio.NewReader(conn).ReadString('\n')
291+
if err != nil {
292+
t.Fatalf("failed to read: %v", err)
293+
}
294+
if resp != "OK\n" {
295+
t.Errorf("unexpected response: %q", resp)
296+
}
297+
298+
// Stop the server
299+
cancel()
300+
time.Sleep(100 * time.Millisecond)
301+
}

0 commit comments

Comments
 (0)