@@ -28,7 +28,6 @@ import (
2828 "flag"
2929 "fmt"
3030 "io"
31- "log"
3231 pseudorand "math/rand"
3332 "net"
3433 "net/http"
@@ -46,6 +45,7 @@ import (
4645 "github.com/gorilla/securecookie"
4746 "github.com/gorilla/websocket"
4847 "github.com/mailru/easyjson"
48+ "go.uber.org/zap"
4949
5050 signaling "github.com/strukturag/nextcloud-spreed-signaling"
5151)
@@ -81,6 +81,8 @@ const (
8181)
8282
8383type Stats struct {
84+ log * zap.Logger
85+
8486 numRecvMessages atomic.Uint64
8587 numSentMessages atomic.Uint64
8688 resetRecvMessages uint64
@@ -107,10 +109,13 @@ func (s *Stats) Log() {
107109 sentMessages := totalSentMessages - s .resetSentMessages
108110 totalRecvMessages := s .numRecvMessages .Load ()
109111 recvMessages := totalRecvMessages - s .resetRecvMessages
110- log .Printf ("Stats: sent=%d (%d/sec), recv=%d (%d/sec), delta=%d" ,
111- totalSentMessages , sentMessages / perSec ,
112- totalRecvMessages , recvMessages / perSec ,
113- totalSentMessages - totalRecvMessages )
112+ s .log .Info ("Stats updated" ,
113+ zap .Uint64 ("sent" , totalSentMessages ),
114+ zap .Uint64 ("sentspeed" , sentMessages / perSec ),
115+ zap .Uint64 ("recv" , totalRecvMessages ),
116+ zap .Uint64 ("recvspeed" , recvMessages / perSec ),
117+ zap .Uint64 ("delta" , totalSentMessages - totalRecvMessages ),
118+ )
114119 s .reset (now )
115120}
116121
@@ -119,6 +124,7 @@ type MessagePayload struct {
119124}
120125
121126type SignalingClient struct {
127+ log * zap.Logger
122128 readyWg * sync.WaitGroup
123129 cookie * securecookie.SecureCookie
124130
@@ -135,13 +141,14 @@ type SignalingClient struct {
135141 userId string
136142}
137143
138- func NewSignalingClient (cookie * securecookie.SecureCookie , url string , stats * Stats , readyWg * sync.WaitGroup , doneWg * sync.WaitGroup ) (* SignalingClient , error ) {
144+ func NewSignalingClient (log * zap. Logger , cookie * securecookie.SecureCookie , url string , stats * Stats , readyWg * sync.WaitGroup , doneWg * sync.WaitGroup ) (* SignalingClient , error ) {
139145 conn , _ , err := websocket .DefaultDialer .Dial (url , nil )
140146 if err != nil {
141147 return nil , err
142148 }
143149
144150 client := & SignalingClient {
151+ log : log ,
145152 readyWg : readyWg ,
146153 cookie : cookie ,
147154
@@ -204,13 +211,19 @@ func (c *SignalingClient) processMessage(message *signaling.ServerMessage) {
204211 case "message" :
205212 c .processMessageMessage (message )
206213 case "bye" :
207- log .Printf ("Received bye: %+v" , message .Bye )
214+ c .log .Error ("Received bye" ,
215+ zap .Any ("bye" , message .Bye ),
216+ )
208217 c .Close ()
209218 case "error" :
210- log .Printf ("Received error: %+v" , message .Error )
219+ c .log .Error ("Received error" ,
220+ zap .Any ("error" , message .Error ),
221+ )
211222 c .Close ()
212223 default :
213- log .Printf ("Unsupported message type: %+v" , * message )
224+ c .log .Warn ("Unsupported message type" ,
225+ zap .Stringer ("message" , message ),
226+ )
214227 }
215228}
216229
@@ -236,7 +249,10 @@ func (c *SignalingClient) processHelloMessage(message *signaling.ServerMessage)
236249 c .privateSessionId = message .Hello .ResumeId
237250 c .publicSessionId = c .privateToPublicSessionId (c .privateSessionId )
238251 c .userId = message .Hello .UserId
239- log .Printf ("Registered as %s (userid %s)" , c .privateSessionId , c .userId )
252+ c .log .Info ("Registered" ,
253+ zap .String ("privateid" , c .privateSessionId ),
254+ zap .String ("userid" , c .userId ),
255+ )
240256 c .readyWg .Done ()
241257}
242258
@@ -249,14 +265,18 @@ func (c *SignalingClient) PublicSessionId() string {
249265func (c * SignalingClient ) processMessageMessage (message * signaling.ServerMessage ) {
250266 var msg MessagePayload
251267 if err := json .Unmarshal (message .Message .Data , & msg ); err != nil {
252- log .Println ("Error in unmarshal" , err )
268+ c .log .Error ("Error in unmarshal" ,
269+ zap .Error (err ),
270+ )
253271 return
254272 }
255273
256274 now := time .Now ()
257275 duration := now .Sub (msg .Now )
258276 if duration > messageReportDuration {
259- log .Printf ("Message took %s" , duration )
277+ c .log .Warn ("Message took too long" ,
278+ zap .Duration ("duration" , duration ),
279+ )
260280 }
261281}
262282
@@ -283,29 +303,37 @@ func (c *SignalingClient) readPump() {
283303 websocket .CloseNormalClosure ,
284304 websocket .CloseGoingAway ,
285305 websocket .CloseNoStatusReceived ) {
286- log .Printf ("Error: %v" , err )
306+ c .log .Error ("Error reading" ,
307+ zap .Error (err ),
308+ )
287309 }
288310 break
289311 }
290312
291313 if messageType != websocket .TextMessage {
292- log .Println ("Unsupported message type" , messageType )
314+ c .log .Error ("Unsupported message type" ,
315+ zap .Int ("type" , messageType ),
316+ )
293317 break
294318 }
295319
296320 decodeBuffer .Reset ()
297321 if _ , err := decodeBuffer .ReadFrom (reader ); err != nil {
298322 c .lock .Lock ()
299323 if c .conn != nil {
300- log .Println ("Error reading message" , err )
324+ c .log .Error ("Error reading message" ,
325+ zap .Error (err ),
326+ )
301327 }
302328 c .lock .Unlock ()
303329 break
304330 }
305331
306332 var message signaling.ServerMessage
307333 if err := message .UnmarshalJSON (decodeBuffer .Bytes ()); err != nil {
308- log .Printf ("Error: %v" , err )
334+ c .log .Error ("Error unmarshalling" ,
335+ zap .Error (err ),
336+ )
309337 break
310338 }
311339
@@ -327,7 +355,10 @@ func (c *SignalingClient) writeInternal(message *signaling.ClientMessage) bool {
327355 return false
328356 }
329357
330- log .Println ("Could not send message" , message , err )
358+ c .log .Error ("Could not send message" ,
359+ zap .Stringer ("message" , message ),
360+ zap .Error (err ),
361+ )
331362 // TODO(jojo): Differentiate between JSON encode errors and websocket errors.
332363 closeData = websocket .FormatCloseMessage (websocket .CloseInternalServerErr , "" )
333364 goto close
@@ -413,29 +444,33 @@ func (c *SignalingClient) SendMessages(clients []*SignalingClient) {
413444 }
414445}
415446
416- func registerAuthHandler (router * mux.Router ) {
447+ func registerAuthHandler (log * zap. Logger , router * mux.Router ) {
417448 router .HandleFunc ("/auth" , func (w http.ResponseWriter , r * http.Request ) {
418449 body , err := io .ReadAll (r .Body )
419450 if err != nil {
420- log .Println ("Error reading body:" , err )
451+ log .Error ("Error reading body" ,
452+ zap .Error (err ),
453+ )
421454 return
422455 }
423456
424457 rnd := r .Header .Get (signaling .HeaderBackendSignalingRandom )
425458 checksum := r .Header .Get (signaling .HeaderBackendSignalingChecksum )
426459 if rnd == "" || checksum == "" {
427- log .Println ("No checksum headers found" )
460+ log .Error ("No checksum headers found" )
428461 return
429462 }
430463
431464 if verify := signaling .CalculateBackendChecksum (rnd , body , backendSecret ); verify != checksum {
432- log .Println ("Backend checksum verification failed" )
465+ log .Error ("Backend checksum verification failed" )
433466 return
434467 }
435468
436469 var request signaling.BackendClientRequest
437470 if err := request .UnmarshalJSON (body ); err != nil {
438- log .Println (err )
471+ log .Error ("Error unmarshalling" ,
472+ zap .Error (err ),
473+ )
439474 return
440475 }
441476
@@ -449,7 +484,9 @@ func registerAuthHandler(router *mux.Router) {
449484
450485 data , err := response .MarshalJSON ()
451486 if err != nil {
452- log .Println (err )
487+ log .Error ("Error marshalling response message" ,
488+ zap .Error (err ),
489+ )
453490 return
454491 }
455492
@@ -467,7 +504,9 @@ func registerAuthHandler(router *mux.Router) {
467504
468505 jsonpayload , err := payload .MarshalJSON ()
469506 if err != nil {
470- log .Println (err )
507+ log .Error ("Error marshalling payload" ,
508+ zap .Error (err ),
509+ )
471510 return
472511 }
473512
@@ -477,10 +516,12 @@ func registerAuthHandler(router *mux.Router) {
477516 })
478517}
479518
480- func getLocalIP () string {
519+ func getLocalIP (log * zap. Logger ) string {
481520 interfaces , err := net .InterfaceAddrs ()
482521 if err != nil {
483- log .Fatal (err )
522+ log .Fatal ("Error getting interfaces" ,
523+ zap .Error (err ),
524+ )
484525 }
485526 for _ , intf := range interfaces {
486527 switch t := intf .(type ) {
@@ -508,11 +549,14 @@ func reverseSessionId(s string) (string, error) {
508549
509550func main () {
510551 flag .Parse ()
511- log .SetFlags (0 )
552+
553+ log := zap .Must (zap .NewDevelopment ())
512554
513555 config , err := goconf .ReadConfigFile (* config )
514556 if err != nil {
515- log .Fatal ("Could not read configuration: " , err )
557+ log .Fatal ("Could not read configuration" ,
558+ zap .Error (err ),
559+ )
516560 }
517561
518562 secret , _ := config .GetString ("backend" , "secret" )
@@ -523,7 +567,9 @@ func main() {
523567 case 32 :
524568 case 64 :
525569 default :
526- log .Printf ("WARNING: The sessions hash key should be 32 or 64 bytes but is %d bytes" , len (hashKey ))
570+ log .Warn ("The sessions hash key should be 32 or 64 bytes" ,
571+ zap .Int ("len" , len (hashKey )),
572+ )
527573 }
528574
529575 blockKey , _ := config .GetString ("sessions" , "blockkey" )
@@ -535,24 +581,30 @@ func main() {
535581 case 24 :
536582 case 32 :
537583 default :
538- log .Fatalf ("The sessions block key must be 16, 24 or 32 bytes but is %d bytes" , len (blockKey ))
584+ log .Fatal ("The sessions block key must be 16, 24 or 32 bytes" ,
585+ zap .Int ("len" , len (blockKey )),
586+ )
539587 }
540588 cookie := securecookie .New ([]byte (hashKey ), blockBytes ).MaxAge (0 )
541589
542590 cpus := runtime .NumCPU ()
543591 runtime .GOMAXPROCS (cpus )
544- log .Printf ("Using a maximum of %d CPUs" , cpus )
592+ log .Debug ("Using number of CPUs" ,
593+ zap .Int ("cpus" , cpus ),
594+ )
545595
546596 interrupt := make (chan os.Signal , 1 )
547597 signal .Notify (interrupt , os .Interrupt )
548598
549599 r := mux .NewRouter ()
550- registerAuthHandler (r )
600+ registerAuthHandler (log , r )
551601
552- localIP := getLocalIP ()
602+ localIP := getLocalIP (log )
553603 listener , err := net .Listen ("tcp" , localIP + ":0" )
554604 if err != nil {
555- log .Fatal (err )
605+ log .Fatal ("Error starting listener" ,
606+ zap .Error (err ),
607+ )
556608 }
557609
558610 server := http.Server {
@@ -562,7 +614,9 @@ func main() {
562614 server .Serve (listener ) // nolint
563615 }()
564616 backendUrl := "http://" + listener .Addr ().String ()
565- log .Println ("Backend server running on" , backendUrl )
617+ log .Info ("Backend server running" ,
618+ zap .String ("url" , backendUrl ),
619+ )
566620
567621 urls := make ([]url.URL , 0 )
568622 urlstrings := make ([]string , 0 )
@@ -575,24 +629,34 @@ func main() {
575629 urls = append (urls , u )
576630 urlstrings = append (urlstrings , u .String ())
577631 }
578- log .Printf ("Connecting to %s" , urlstrings )
632+ log .Info ("Connecting" ,
633+ zap .Strings ("urls" , urlstrings ),
634+ )
579635
580636 clients := make ([]* SignalingClient , 0 )
581- stats := & Stats {}
637+ stats := & Stats {
638+ log : log ,
639+ }
582640
583641 if * maxClients < 2 {
584- log .Fatalf ("Need at least 2 clients, got %d" , * maxClients )
642+ log .Fatal ("Need at least 2 clients" ,
643+ zap .Int ("count" , * maxClients ),
644+ )
585645 }
586646
587- log .Printf ("Starting %d clients" , * maxClients )
647+ log .Info ("Starting clients" ,
648+ zap .Int ("count" , * maxClients ),
649+ )
588650
589651 var doneWg sync.WaitGroup
590652 var readyWg sync.WaitGroup
591653
592654 for i := 0 ; i < * maxClients ; i ++ {
593- client , err := NewSignalingClient (cookie , urls [i % len (urls )].String (), stats , & readyWg , & doneWg )
655+ client , err := NewSignalingClient (log , cookie , urls [i % len (urls )].String (), stats , & readyWg , & doneWg )
594656 if err != nil {
595- log .Fatal (err )
657+ log .Fatal ("Error creating signaling client" ,
658+ zap .Error (err ),
659+ )
596660 }
597661 defer client .Close ()
598662 readyWg .Add (1 )
@@ -612,10 +676,10 @@ func main() {
612676 clients = append (clients , client )
613677 }
614678
615- log .Println ("Clients created" )
679+ log .Info ("Clients created" )
616680 readyWg .Wait ()
617681
618- log .Println ("All connections established" )
682+ log .Info ("All connections established" )
619683
620684 for _ , c := range clients {
621685 doneWg .Add (1 )
@@ -632,14 +696,14 @@ loop:
632696 for {
633697 select {
634698 case <- interrupt :
635- log .Println ("Interrupted" )
699+ log .Info ("Interrupted" )
636700 break loop
637701 case <- report .C :
638702 stats .Log ()
639703 }
640704 }
641705
642- log .Println ("Waiting for clients to terminate ..." )
706+ log .Info ("Waiting for clients to terminate ..." )
643707 for _ , c := range clients {
644708 c .Close ()
645709 }
0 commit comments