Skip to content

Commit 13ed3b5

Browse files
committed
Add tests for setting bandwidth.
1 parent 972356f commit 13ed3b5

File tree

3 files changed

+293
-3
lines changed

3 files changed

+293
-3
lines changed

mcu_janus_test.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"context"
2626
"encoding/json"
2727
"maps"
28+
"net/http/httptest"
2829
"strings"
2930
"sync"
3031
"sync/atomic"
@@ -2017,3 +2018,88 @@ func Test_JanusSubscriberUpdateOffer(t *testing.T) {
20172018
// Test MCU will trigger an updated offer.
20182019
client2.RunUntilOffer(ctx, MockSdpOfferAudioOnly)
20192020
}
2021+
2022+
func Test_JanusSetBandwidth(t *testing.T) {
2023+
require := require.New(t)
2024+
assert := assert.New(t)
2025+
2026+
var counter atomic.Int32
2027+
mcu, gateway := newMcuJanusForTesting(t)
2028+
gateway.registerHandlers(map[string]TestJanusHandler{
2029+
"configure": func(room *TestJanusRoom, body, jsep api.StringMap) (any, *janus.ErrorMsg) {
2030+
assert.EqualValues(1, room.id)
2031+
switch counter.Add(1) {
2032+
case 1:
2033+
return &janus.EventMsg{
2034+
Jsep: api.StringMap{
2035+
"type": "answer",
2036+
"sdp": MockSdpAnswerAudioAndVideo,
2037+
},
2038+
}, nil
2039+
case 2:
2040+
// When the room updates the bandwidth per publisher, it will notify
2041+
// Janus about it.
2042+
assert.EqualValues(500000, body["bitrate"], "got %+v", body)
2043+
return &janus.EventMsg{}, nil
2044+
default:
2045+
assert.Fail("too many configure requests", "received body=%+v, jsep=%+v", body, jsep)
2046+
return &janus.ErrorMsg{
2047+
Err: janus.ErrorData{
2048+
Code: JANUS_ERROR_UNKNOWN,
2049+
Reason: "too many configure requests",
2050+
},
2051+
}, nil
2052+
}
2053+
},
2054+
})
2055+
2056+
hub, _, _, server := CreateHubForTestWithConfig(t, func(s *httptest.Server) (*goconf.ConfigFile, error) {
2057+
config, err := getTestConfig(s)
2058+
if err != nil {
2059+
return nil, err
2060+
}
2061+
2062+
config.AddOption("backend", "maxstreambitrate", "700000")
2063+
config.AddOption("backend", "maxscreenbitrate", "800000")
2064+
2065+
config.AddOption("backend", "bitrateperroom", "1000000")
2066+
config.AddOption("backend", "minpublisherbitrate", "10000")
2067+
config.AddOption("backend", "maxpublisherbitrate", "500000")
2068+
return config, err
2069+
})
2070+
hub.SetMcu(mcu)
2071+
2072+
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
2073+
defer cancel()
2074+
2075+
client, hello := NewTestClientWithHello(ctx, t, server, hub, testDefaultUserId+"1")
2076+
2077+
// Join room by id.
2078+
roomId := "test-room"
2079+
roomMsg := MustSucceed2(t, client.JoinRoom, ctx, roomId)
2080+
require.Equal(roomId, roomMsg.Room.RoomId)
2081+
client.RunUntilJoined(ctx, hello.Hello)
2082+
2083+
require.NoError(client.SendMessage(MessageClientMessageRecipient{
2084+
Type: "session",
2085+
SessionId: hello.Hello.SessionId,
2086+
}, MessageClientMessageData{
2087+
Type: "offer",
2088+
RoomType: "video",
2089+
Payload: api.StringMap{
2090+
"sdp": MockSdpOfferAudioAndVideo,
2091+
},
2092+
}))
2093+
2094+
client.RunUntilAnswer(ctx, MockSdpAnswerAudioAndVideo)
2095+
2096+
pub, err := mcu.getPublisher(ctx, hello.Hello.SessionId, StreamTypeVideo)
2097+
require.NoError(err)
2098+
pub.UpdateBandwidth("video", api.BandwidthFromBits(2000), api.BandwidthFromBits(100000))
2099+
2100+
room := hub.getRoom(roomId)
2101+
require.NotNil(room)
2102+
room.updateBandwidth().Wait()
2103+
2104+
assert.EqualValues(2, counter.Load())
2105+
}

mcu_proxy_test.go

Lines changed: 160 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ import (
4747
"github.com/stretchr/testify/assert"
4848
"github.com/stretchr/testify/require"
4949
"go.etcd.io/etcd/server/v3/embed"
50+
51+
"github.com/strukturag/nextcloud-spreed-signaling/api"
5052
)
5153

5254
const (
@@ -190,7 +192,8 @@ func Test_sortConnectionsForCountryWithOverride(t *testing.T) {
190192
type proxyServerClientHandler func(msg *ProxyClientMessage) (*ProxyServerMessage, error)
191193

192194
type testProxyServerPublisher struct {
193-
id PublicSessionId
195+
id PublicSessionId
196+
bandwidth api.AtomicBandwidth
194197
}
195198

196199
type testProxyServerSubscriber struct {
@@ -290,6 +293,8 @@ func (c *testProxyServerClient) processRegularMessage(msg *ProxyClientMessage) (
290293
switch msg.Type {
291294
case "command":
292295
handler = c.processCommandMessage
296+
case "payload":
297+
handler = c.processPayloadMessage
293298
}
294299

295300
if handler == nil {
@@ -423,6 +428,21 @@ func (c *testProxyServerClient) processCommandMessage(msg *ProxyClientMessage) (
423428
}
424429
c.server.updateLoad(-1)
425430
}
431+
case "update-bandwidth":
432+
pub := c.server.getPublisher(PublicSessionId(msg.Command.ClientId))
433+
if pub == nil {
434+
response = msg.NewWrappedErrorServerMessage(fmt.Errorf("publisher %s not found", msg.Command.ClientId))
435+
return response, nil
436+
}
437+
438+
pub.bandwidth.Store(msg.Command.Bandwidth)
439+
response = &ProxyServerMessage{
440+
Id: msg.Id,
441+
Type: "command",
442+
Command: &CommandProxyServerMessage{
443+
Id: string(pub.id),
444+
},
445+
}
426446
}
427447
if response == nil {
428448
response = msg.NewWrappedErrorServerMessage(fmt.Errorf("command \"%s\" is not implemented", msg.Command.Type))
@@ -431,6 +451,36 @@ func (c *testProxyServerClient) processCommandMessage(msg *ProxyClientMessage) (
431451
return response, nil
432452
}
433453

454+
func (c *testProxyServerClient) processPayloadMessage(msg *ProxyClientMessage) (*ProxyServerMessage, error) {
455+
var response *ProxyServerMessage
456+
switch msg.Payload.Type {
457+
case "offer":
458+
pub := c.server.getPublisher(PublicSessionId(msg.Payload.ClientId))
459+
if pub == nil {
460+
response = msg.NewWrappedErrorServerMessage(fmt.Errorf("no such publisher: %s", msg.Payload.ClientId))
461+
return response, nil
462+
}
463+
464+
assert.Equal(c.t, MockSdpOfferAudioAndVideo, msg.Payload.Payload["sdp"])
465+
response = &ProxyServerMessage{
466+
Id: msg.Id,
467+
Type: "payload",
468+
Payload: &PayloadProxyServerMessage{
469+
ClientId: string(pub.id),
470+
Type: "answer",
471+
Payload: api.StringMap{
472+
"type": "answer",
473+
"sdp": MockSdpAnswerAudioAndVideo,
474+
},
475+
},
476+
}
477+
default:
478+
response = msg.NewWrappedErrorServerMessage(fmt.Errorf("payload type \"%s\" is not implemented", msg.Payload.Type))
479+
}
480+
481+
return response, nil
482+
}
483+
434484
func (c *testProxyServerClient) close() {
435485
c.mu.Lock()
436486
defer c.mu.Unlock()
@@ -2588,3 +2638,112 @@ func Test_ProxyResumeFail(t *testing.T) {
25882638
assert.NotEqual(sessionId, connections[0].SessionId())
25892639
}
25902640
}
2641+
2642+
func Test_ProxySetBandwidth(t *testing.T) {
2643+
t.Parallel()
2644+
require := require.New(t)
2645+
assert := assert.New(t)
2646+
server := NewProxyServerForTest(t, "DE")
2647+
mcu, _ := newMcuProxyForTestWithOptions(t, proxyTestOptions{
2648+
servers: []*TestProxyServerHandler{server},
2649+
}, 0)
2650+
2651+
hub, _, _, hubserver := CreateHubForTestWithConfig(t, func(s *httptest.Server) (*goconf.ConfigFile, error) {
2652+
config, err := getTestConfig(s)
2653+
if err != nil {
2654+
return nil, err
2655+
}
2656+
2657+
config.AddOption("backend", "maxstreambitrate", "700000")
2658+
config.AddOption("backend", "maxscreenbitrate", "800000")
2659+
2660+
config.AddOption("backend", "bitrateperroom", "1000000")
2661+
config.AddOption("backend", "minpublisherbitrate", "10000")
2662+
config.AddOption("backend", "maxpublisherbitrate", "500000")
2663+
return config, err
2664+
})
2665+
hub.SetMcu(mcu)
2666+
2667+
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
2668+
defer cancel()
2669+
2670+
client, hello := NewTestClientWithHello(ctx, t, hubserver, hub, testDefaultUserId+"1")
2671+
2672+
// Join room by id.
2673+
roomId := "test-room"
2674+
roomMsg := MustSucceed2(t, client.JoinRoom, ctx, roomId)
2675+
require.Equal(roomId, roomMsg.Room.RoomId)
2676+
client.RunUntilJoined(ctx, hello.Hello)
2677+
2678+
require.NoError(client.SendMessage(MessageClientMessageRecipient{
2679+
Type: "session",
2680+
SessionId: hello.Hello.SessionId,
2681+
}, MessageClientMessageData{
2682+
Type: "offer",
2683+
RoomType: "video",
2684+
Payload: api.StringMap{
2685+
"sdp": MockSdpOfferAudioAndVideo,
2686+
},
2687+
}))
2688+
2689+
client.RunUntilAnswer(ctx, MockSdpAnswerAudioAndVideo)
2690+
2691+
pub := mcu.getPublisherConnection(hello.Hello.SessionId, StreamTypeVideo)
2692+
require.NotNil(pub)
2693+
2694+
var publisherId string
2695+
var publisher *mcuProxyPublisher
2696+
pub.publishersLock.RLock()
2697+
if assert.Len(pub.publishers, 1) {
2698+
for id, mcuPub := range pub.publishers {
2699+
publisherId = id
2700+
publisher = mcuPub
2701+
break
2702+
}
2703+
}
2704+
pub.publishersLock.RUnlock()
2705+
require.NotEmpty(publisherId)
2706+
require.NotNil(publisher)
2707+
2708+
proxyclient := server.GetSingleClient()
2709+
proxyclient.sendMessage(&ProxyServerMessage{
2710+
Type: "event",
2711+
Event: &EventProxyServerMessage{
2712+
Type: "update-load",
2713+
Load: 1,
2714+
ClientBandwidths: map[string]EventProxyServerBandwidth{
2715+
publisherId: {
2716+
Sent: api.BandwidthFromBits(0),
2717+
Received: api.BandwidthFromBits(100000),
2718+
},
2719+
},
2720+
},
2721+
})
2722+
2723+
for publisher.Bandwidth() == nil {
2724+
if !assert.NoError(ctx.Err()) {
2725+
break
2726+
}
2727+
2728+
time.Sleep(time.Millisecond)
2729+
}
2730+
if bw := publisher.Bandwidth(); assert.NotNil(bw) {
2731+
assert.EqualValues(0, bw.Sent)
2732+
assert.EqualValues(100000, bw.Received)
2733+
}
2734+
2735+
room := hub.getRoom(roomId)
2736+
require.NotNil(room)
2737+
room.updateBandwidth().Wait()
2738+
2739+
proxyPub := server.getPublisher(PublicSessionId(publisherId))
2740+
require.NotNil(proxyPub)
2741+
for proxyPub.bandwidth.Load() == 0 {
2742+
if !assert.NoError(ctx.Err()) {
2743+
break
2744+
}
2745+
2746+
time.Sleep(time.Millisecond)
2747+
}
2748+
assert.EqualValues(500000, proxyPub.bandwidth.Load())
2749+
}

proxy/proxy_server_test.go

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -443,20 +443,30 @@ func (p *TestMCUPublisher) UnpublishRemote(ctx context.Context, remoteId signali
443443

444444
type PublisherTestMCU struct {
445445
TestMCU
446+
447+
publisher atomic.Pointer[TestPublisherWithBandwidth]
446448
}
447449

448450
type TestPublisherWithBandwidth struct {
449451
TestMCUPublisher
450452

451-
bandwidth *signaling.McuClientBandwidthInfo
453+
t *testing.T
454+
bandwidth *signaling.McuClientBandwidthInfo
455+
bandwidthSet atomic.Bool
452456
}
453457

454458
func (p *TestPublisherWithBandwidth) Bandwidth() *signaling.McuClientBandwidthInfo {
455459
return p.bandwidth
456460
}
457461

458462
func (p *TestPublisherWithBandwidth) SetBandwidth(ctx context.Context, bandwidth api.Bandwidth) error {
459-
return errors.New("not implemented")
463+
assert.EqualValues(p.t, 20000, bandwidth)
464+
p.bandwidthSet.Store(true)
465+
return nil
466+
}
467+
468+
func (m *PublisherTestMCU) GetPublisher() *TestPublisherWithBandwidth {
469+
return m.publisher.Load()
460470
}
461471

462472
func (m *PublisherTestMCU) NewPublisher(ctx context.Context, listener signaling.McuListener, id signaling.PublicSessionId, sid string, streamType signaling.StreamType, settings signaling.NewPublisherSettings, initiator signaling.McuInitiator) (signaling.McuPublisher, error) {
@@ -467,11 +477,16 @@ func (m *PublisherTestMCU) NewPublisher(ctx context.Context, listener signaling.
467477
streamType: streamType,
468478
},
469479

480+
t: m.t,
470481
bandwidth: &signaling.McuClientBandwidthInfo{
471482
Sent: api.BandwidthFromBytes(1000),
472483
Received: api.BandwidthFromBytes(2000),
473484
},
474485
}
486+
if !m.publisher.CompareAndSwap(nil, publisher) {
487+
return nil, errors.New("only one publisher supported")
488+
}
489+
475490
return publisher, nil
476491
}
477492

@@ -518,12 +533,17 @@ func TestProxyPublisherBandwidth(t *testing.T) {
518533
},
519534
}))
520535

536+
var publisherId string
521537
if message, err := client.RunUntilMessage(ctx); assert.NoError(err) {
522538
assert.Equal("2345", message.Id)
523539
if err := checkMessageType(message, "command"); assert.NoError(err) {
524540
assert.NotEmpty(message.Command.Id)
541+
publisherId = message.Command.Id
525542
}
526543
}
544+
require.NotEmpty(publisherId)
545+
publisher := mcu.GetPublisher()
546+
require.NotNil(publisher)
527547

528548
proxy.updateLoad()
529549

@@ -538,8 +558,33 @@ func TestProxyPublisherBandwidth(t *testing.T) {
538558
assert.EqualValues(10, *bw.Outgoing)
539559
}
540560
}
561+
if assert.Len(message.Event.ClientBandwidths, 1) {
562+
if bw := message.Event.ClientBandwidths; assert.NotNil(bw[publisherId], "expected %s, got %+v", bw) {
563+
assert.EqualValues(8000, bw[publisherId].Sent)
564+
assert.EqualValues(16000, bw[publisherId].Received)
565+
}
566+
}
567+
}
568+
}
569+
570+
require.NoError(client.WriteJSON(&signaling.ProxyClientMessage{
571+
Id: "3456",
572+
Type: "command",
573+
Command: &signaling.CommandProxyClientMessage{
574+
Type: "update-bandwidth",
575+
ClientId: publisherId,
576+
Bandwidth: api.BandwidthFromBits(20000),
577+
},
578+
}))
579+
580+
if message, err := client.RunUntilMessage(ctx); assert.NoError(err) {
581+
assert.Equal("3456", message.Id)
582+
if err := checkMessageType(message, "command"); assert.NoError(err) {
583+
assert.Equal(publisherId, message.Command.Id)
541584
}
542585
}
586+
587+
assert.True(publisher.bandwidthSet.Load(), "should have set bandwidth")
543588
}
544589

545590
type HangingTestMCU struct {

0 commit comments

Comments
 (0)