Skip to content

Commit 22be67a

Browse files
authored
Merge pull request #187 from GetStream/feature/uni-138-fix-ws-reconnection-errors
Feature/uni 138 fix ws reconnection errors
2 parents 3373cca + aa17d08 commit 22be67a

28 files changed

+586
-232
lines changed
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
using System;
2+
using StreamVideo.Core.StatefulModels;
3+
4+
namespace StreamVideo.Core.Exceptions
5+
{
6+
/// <summary>
7+
/// Exception thrown when trying to join a <see cref="IStreamCall"/> but another <see cref="IStreamCall"/> is already joined or currently joining
8+
/// </summary>
9+
public class StreamCallInProgressException : Exception
10+
{
11+
public StreamCallInProgressException(string message) : base(message)
12+
{
13+
}
14+
}
15+
}

Packages/StreamVideo/Runtime/Core/Exceptions/StreamCallInProgressException.cs.meta

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
using System;
2+
using StreamVideo.Core.StatefulModels;
3+
4+
namespace StreamVideo.Core.Exceptions
5+
{
6+
/// <summary>
7+
/// Exception thrown when a <see cref="IStreamCall"/> with provided ID is not found
8+
/// </summary>
9+
public class StreamCallNotFoundException : Exception
10+
{
11+
public StreamCallNotFoundException(string message) : base(message)
12+
{
13+
14+
}
15+
}
16+
}

Packages/StreamVideo/Runtime/Core/Exceptions/StreamCallNotFoundException.cs.meta

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Packages/StreamVideo/Runtime/Core/IStreamVideoClient.cs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Threading;
34
using System.Threading.Tasks;
45
using StreamVideo.Core.QueryBuilders.Sort.Calls;
56
using StreamVideo.Core.DeviceManagers;
@@ -67,6 +68,8 @@ public interface IStreamVideoClient : IStreamVideoClientEventsListener, IDisposa
6768
/// </summary>
6869
/// <param name="credentials">Credentials required to connect user: api_key, user_id, and user_token</param>
6970
Task<IStreamVideoUser> ConnectUserAsync(AuthCredentials credentials);
71+
72+
Task<IStreamVideoUser> ConnectUserAsync(AuthCredentials credentials, CancellationToken cancellationToken);
7073

7174
/// <summary>
7275
/// Disconnect user from Stream server.
@@ -76,10 +79,19 @@ public interface IStreamVideoClient : IStreamVideoClientEventsListener, IDisposa
7679
Task<IStreamCall> JoinCallAsync(StreamCallType callType, string callId, bool create, bool ring,
7780
bool notify);
7881

82+
Task<IStreamCall> JoinCallAsync(StreamCallType callType, string callId, bool create, bool ring,
83+
bool notify, CancellationToken cancellationToken);
84+
7985
/// <summary>
80-
/// Gets call information without joining it. Will return null if the call doesn't exist
86+
/// Gets <see cref="IStreamCall"/> information without joining it. Will return null if the call doesn't exist
8187
/// </summary>
8288
Task<IStreamCall> GetCallAsync(StreamCallType callType, string callId);
89+
90+
/// <summary>
91+
/// Gets <see cref="IStreamCall"/> information without joining it. Will return null if the call doesn't exist
92+
/// </summary>
93+
Task<IStreamCall> GetCallAsync(StreamCallType callType, string callId,
94+
CancellationToken cancellationToken);
8395

8496
/// <summary>
8597
/// Get a call with a specified Type and ID. If such a call doesn't exist, it will be created.
@@ -88,6 +100,14 @@ Task<IStreamCall> JoinCallAsync(StreamCallType callType, string callId, bool cre
88100
/// <param name="callId">Call ID</param>
89101
/// <returns>Call object of type: <see cref="IStreamCall"/></returns>
90102
Task<IStreamCall> GetOrCreateCallAsync(StreamCallType callType, string callId);
103+
104+
/// <summary>
105+
/// Get a call with a specified Type and ID. If such a call doesn't exist, it will be created.
106+
/// </summary>
107+
/// <param name="callType">Call type - this defines the permissions and other settings for the call. Read more in the <a href="https://getstream.io/video/docs/unity/guides/call-types/">Call Types Docs</a></param>
108+
/// <param name="callId">Call ID</param>
109+
/// <returns>Call object of type: <see cref="IStreamCall"/></returns>
110+
Task<IStreamCall> GetOrCreateCallAsync(StreamCallType callType, string callId, CancellationToken cancellationToken);
91111

92112
/// <summary>
93113
/// Query calls

Packages/StreamVideo/Runtime/Core/InternalDTO/Sfu/GeneratedAPI.cs

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
using System.Threading;
2+
13
namespace StreamVideo.Core.Sfu {
24
using Signal = StreamVideo.v1.Sfu.Signal;
35
// Generated by protoc-gen-twirpcs. DO NOT EDIT!
@@ -46,11 +48,12 @@ private static string parseJSONString(string jsonData, string key) {
4648
}
4749

4850
private delegate Resp doParsing<Resp>(byte[] data) where Resp : IMessage;
49-
private static async Task<Resp> DoRequest<Req, Resp>(HttpClient client, string address, Req req, doParsing<Resp> parserFunc) where Req : IMessage where Resp : IMessage {
51+
private static async Task<Resp> DoRequest<Req, Resp>(HttpClient client, string address, Req req, doParsing<Resp> parserFunc, CancellationToken cancellationToken = default) where Req : IMessage where Resp : IMessage {
5052
using (var content = new ByteArrayContent(req.ToByteArray())) {
5153
content.Headers.ContentType = CONTENT_TYPE_PROTOBUF;
52-
using (HttpResponseMessage response = await client.PostAsync(address, content)) {
54+
using (HttpResponseMessage response = await client.PostAsync(address, content, cancellationToken)) {
5355
var byteArr = await response.Content.ReadAsByteArrayAsync();
56+
cancellationToken.ThrowIfCancellationRequested();
5457
if (!response.IsSuccessStatusCode) {
5558
string errorJSON = System.Text.Encoding.UTF8.GetString(byteArr, 0, byteArr.Length);
5659
throw createException(errorJSON);
@@ -61,44 +64,44 @@ private static async Task<Resp> DoRequest<Req, Resp>(HttpClient client, string a
6164
}
6265

6366
// SetPublisher sends the WebRTC offer for the peer connection used to publish A/V
64-
public static async Task<Signal.SetPublisherResponse> SetPublisher(HttpClient client, Signal.SetPublisherRequest req) {
65-
return await DoRequest<Signal.SetPublisherRequest, Signal.SetPublisherResponse>(client, "/twirp/stream.video.sfu.signal.SignalServer/SetPublisher", req, Signal.SetPublisherResponse.Parser.ParseFrom);
67+
public static async Task<Signal.SetPublisherResponse> SetPublisher(HttpClient client, Signal.SetPublisherRequest req, CancellationToken cancellationToken = default) {
68+
return await DoRequest<Signal.SetPublisherRequest, Signal.SetPublisherResponse>(client, "/twirp/stream.video.sfu.signal.SignalServer/SetPublisher", req, Signal.SetPublisherResponse.Parser.ParseFrom, cancellationToken);
6669
}
6770

6871
// answer is sent by the client to the SFU after receiving a subscriber_offer.
69-
public static async Task<Signal.SendAnswerResponse> SendAnswer(HttpClient client, Signal.SendAnswerRequest req) {
70-
return await DoRequest<Signal.SendAnswerRequest, Signal.SendAnswerResponse>(client, "/twirp/stream.video.sfu.signal.SignalServer/SendAnswer", req, Signal.SendAnswerResponse.Parser.ParseFrom);
72+
public static async Task<Signal.SendAnswerResponse> SendAnswer(HttpClient client, Signal.SendAnswerRequest req, CancellationToken cancellationToken = default) {
73+
return await DoRequest<Signal.SendAnswerRequest, Signal.SendAnswerResponse>(client, "/twirp/stream.video.sfu.signal.SignalServer/SendAnswer", req, Signal.SendAnswerResponse.Parser.ParseFrom, cancellationToken);
7174
}
7275

7376
// SendICECandidate sends an ICE candidate to the client
74-
public static async Task<StreamVideo.v1.Sfu.Signal.ICETrickleResponse> IceTrickle(HttpClient client, StreamVideo.v1.Sfu.Models.ICETrickle req) {
75-
return await DoRequest<StreamVideo.v1.Sfu.Models.ICETrickle, StreamVideo.v1.Sfu.Signal.ICETrickleResponse>(client, "/twirp/stream.video.sfu.signal.SignalServer/IceTrickle", req, StreamVideo.v1.Sfu.Signal.ICETrickleResponse.Parser.ParseFrom);
77+
public static async Task<StreamVideo.v1.Sfu.Signal.ICETrickleResponse> IceTrickle(HttpClient client, StreamVideo.v1.Sfu.Models.ICETrickle req, CancellationToken cancellationToken = default) {
78+
return await DoRequest<StreamVideo.v1.Sfu.Models.ICETrickle, StreamVideo.v1.Sfu.Signal.ICETrickleResponse>(client, "/twirp/stream.video.sfu.signal.SignalServer/IceTrickle", req, StreamVideo.v1.Sfu.Signal.ICETrickleResponse.Parser.ParseFrom, cancellationToken);
7679
}
7780

7881
// UpdateSubscribers is used to notify the SFU about the list of video subscriptions
7982
// TODO: sync subscriptions based on this + update tracks using the dimension info sent by the user
80-
public static async Task<Signal.UpdateSubscriptionsResponse> UpdateSubscriptions(HttpClient client, Signal.UpdateSubscriptionsRequest req) {
81-
return await DoRequest<Signal.UpdateSubscriptionsRequest, Signal.UpdateSubscriptionsResponse>(client, "/twirp/stream.video.sfu.signal.SignalServer/UpdateSubscriptions", req, Signal.UpdateSubscriptionsResponse.Parser.ParseFrom);
83+
public static async Task<Signal.UpdateSubscriptionsResponse> UpdateSubscriptions(HttpClient client, Signal.UpdateSubscriptionsRequest req, CancellationToken cancellationToken = default) {
84+
return await DoRequest<Signal.UpdateSubscriptionsRequest, Signal.UpdateSubscriptionsResponse>(client, "/twirp/stream.video.sfu.signal.SignalServer/UpdateSubscriptions", req, Signal.UpdateSubscriptionsResponse.Parser.ParseFrom, cancellationToken);
8285
}
8386

84-
public static async Task<Signal.UpdateMuteStatesResponse> UpdateMuteStates(HttpClient client, Signal.UpdateMuteStatesRequest req) {
85-
return await DoRequest<Signal.UpdateMuteStatesRequest, Signal.UpdateMuteStatesResponse>(client, "/twirp/stream.video.sfu.signal.SignalServer/UpdateMuteStates", req, Signal.UpdateMuteStatesResponse.Parser.ParseFrom);
87+
public static async Task<Signal.UpdateMuteStatesResponse> UpdateMuteStates(HttpClient client, Signal.UpdateMuteStatesRequest req, CancellationToken cancellationToken = default) {
88+
return await DoRequest<Signal.UpdateMuteStatesRequest, Signal.UpdateMuteStatesResponse>(client, "/twirp/stream.video.sfu.signal.SignalServer/UpdateMuteStates", req, Signal.UpdateMuteStatesResponse.Parser.ParseFrom, cancellationToken);
8689
}
8790

88-
public static async Task<Signal.ICERestartResponse> IceRestart(HttpClient client, Signal.ICERestartRequest req) {
89-
return await DoRequest<Signal.ICERestartRequest, Signal.ICERestartResponse>(client, "/twirp/stream.video.sfu.signal.SignalServer/IceRestart", req, Signal.ICERestartResponse.Parser.ParseFrom);
91+
public static async Task<Signal.ICERestartResponse> IceRestart(HttpClient client, Signal.ICERestartRequest req, CancellationToken cancellationToken = default) {
92+
return await DoRequest<Signal.ICERestartRequest, Signal.ICERestartResponse>(client, "/twirp/stream.video.sfu.signal.SignalServer/IceRestart", req, Signal.ICERestartResponse.Parser.ParseFrom, cancellationToken);
9093
}
9194

92-
public static async Task<Signal.SendStatsResponse> SendStats(HttpClient client, Signal.SendStatsRequest req) {
93-
return await DoRequest<Signal.SendStatsRequest, Signal.SendStatsResponse>(client, "/twirp/stream.video.sfu.signal.SignalServer/SendStats", req, Signal.SendStatsResponse.Parser.ParseFrom);
95+
public static async Task<Signal.SendStatsResponse> SendStats(HttpClient client, Signal.SendStatsRequest req, CancellationToken cancellationToken = default) {
96+
return await DoRequest<Signal.SendStatsRequest, Signal.SendStatsResponse>(client, "/twirp/stream.video.sfu.signal.SignalServer/SendStats", req, Signal.SendStatsResponse.Parser.ParseFrom, cancellationToken);
9497
}
9598

96-
public static async Task<Signal.StartNoiseCancellationResponse> StartNoiseCancellation(HttpClient client, Signal.StartNoiseCancellationRequest req) {
97-
return await DoRequest<Signal.StartNoiseCancellationRequest, Signal.StartNoiseCancellationResponse>(client, "/twirp/stream.video.sfu.signal.SignalServer/StartNoiseCancellation", req, Signal.StartNoiseCancellationResponse.Parser.ParseFrom);
99+
public static async Task<Signal.StartNoiseCancellationResponse> StartNoiseCancellation(HttpClient client, Signal.StartNoiseCancellationRequest req, CancellationToken cancellationToken = default) {
100+
return await DoRequest<Signal.StartNoiseCancellationRequest, Signal.StartNoiseCancellationResponse>(client, "/twirp/stream.video.sfu.signal.SignalServer/StartNoiseCancellation", req, Signal.StartNoiseCancellationResponse.Parser.ParseFrom, cancellationToken);
98101
}
99102

100-
public static async Task<Signal.StopNoiseCancellationResponse> StopNoiseCancellation(HttpClient client, Signal.StopNoiseCancellationRequest req) {
101-
return await DoRequest<Signal.StopNoiseCancellationRequest, Signal.StopNoiseCancellationResponse>(client, "/twirp/stream.video.sfu.signal.SignalServer/StopNoiseCancellation", req, Signal.StopNoiseCancellationResponse.Parser.ParseFrom);
103+
public static async Task<Signal.StopNoiseCancellationResponse> StopNoiseCancellation(HttpClient client, Signal.StopNoiseCancellationRequest req, CancellationToken cancellationToken = default) {
104+
return await DoRequest<Signal.StopNoiseCancellationRequest, Signal.StopNoiseCancellationResponse>(client, "/twirp/stream.video.sfu.signal.SignalServer/StopNoiseCancellation", req, Signal.StopNoiseCancellationResponse.Parser.ParseFrom, cancellationToken);
102105
}
103106
}
104107
}

Packages/StreamVideo/Runtime/Core/LowLevelClient/API/Internal/IInternallVideoClientApi.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using System.Threading;
12
using System.Threading.Tasks;
23
using StreamVideo.Core.InternalDTO.Models;
34
using StreamVideo.Core.InternalDTO.Requests;
@@ -8,13 +9,13 @@ namespace StreamVideo.Core.LowLevelClient.API.Internal
89
internal interface IInternalVideoClientApi
910
{
1011
Task<GetCallResponseInternalDTO> GetCallAsync(StreamCallType callType, string callId,
11-
GetOrCreateCallRequestInternalDTO getCallRequest);
12+
GetOrCreateCallRequestInternalDTO getCallRequest, CancellationToken cancellationToken);
1213

1314
Task<UpdateCallResponseInternalDTO> UpdateCallAsync(StreamCallType callType, string callId,
1415
UpdateCallRequestInternalDTO updateCallRequest);
1516

1617
Task<GetOrCreateCallResponseInternalDTO> GetOrCreateCallAsync(StreamCallType callType, string callId,
17-
GetOrCreateCallRequestInternalDTO getOrCreateCallRequest);
18+
GetOrCreateCallRequestInternalDTO getOrCreateCallRequest, CancellationToken cancellationToken);
1819

1920
Task<AcceptCallResponseInternalDTO> AcceptCallAsync(StreamCallType callType, string callId);
2021

Packages/StreamVideo/Runtime/Core/LowLevelClient/API/Internal/InternalApiClientBase.cs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Text;
3+
using System.Threading;
34
using System.Threading.Tasks;
45
using StreamVideo.Core.Exceptions;
56
using StreamVideo.Core.InternalDTO.Models;
@@ -25,14 +26,18 @@ protected InternalApiClientBase(IHttpClient httpClient, ISerializer serializer,
2526
_lowLevelClient = lowLevelClient ?? throw new ArgumentNullException(nameof(lowLevelClient));
2627
}
2728

28-
protected Task<TResponse> Get<TPayload, TResponse>(string endpoint, TPayload payload)
29-
=> HttpRequest<TResponse>(HttpMethodType.Get, endpoint, payload);
29+
//StreamTODO: add cancellation token support to all
30+
31+
protected Task<TResponse> Get<TPayload, TResponse>(string endpoint, TPayload payload,
32+
CancellationToken cancellationToken)
33+
=> HttpRequest<TResponse>(HttpMethodType.Get, endpoint, payload, cancellationToken: cancellationToken);
3034

3135
protected Task<TResponse> Get<TResponse>(string endpoint, QueryParameters parameters = null)
3236
=> HttpRequest<TResponse>(HttpMethodType.Get, endpoint, queryParameters: parameters);
3337

34-
protected Task<TResponse> Post<TRequest, TResponse>(string endpoint, TRequest request = default)
35-
=> HttpRequest<TResponse>(HttpMethodType.Post, endpoint, request);
38+
protected Task<TResponse> Post<TRequest, TResponse>(string endpoint, TRequest request = default,
39+
CancellationToken cancellationToken = default)
40+
=> HttpRequest<TResponse>(HttpMethodType.Post, endpoint, request, cancellationToken: cancellationToken);
3641

3742
protected Task<TResponse> Post<TResponse>(string endpoint, object request = null)
3843
=> HttpRequest<TResponse>(HttpMethodType.Post, endpoint, request);
@@ -74,7 +79,7 @@ private object TrySerializeRequestBodyContent(object content, out string seriali
7479
}
7580

7681
private async Task<TResponse> HttpRequest<TResponse>(HttpMethodType httpMethod, string endpoint,
77-
object requestBody = default, QueryParameters queryParameters = null)
82+
object requestBody = default, QueryParameters queryParameters = null, CancellationToken cancellationToken = default)
7883
{
7984
// //StreamTodo: perhaps remove this requirement, sometimes we send empty body without any properties
8085
// if (requestBody == null && IsRequestBodyRequiredByHttpMethod(httpMethod))
@@ -95,7 +100,7 @@ private async Task<TResponse> HttpRequest<TResponse>(HttpMethodType httpMethod,
95100

96101
LogFutureRequestIfDebug(uri, endpoint, httpMethod, logContent);
97102

98-
var httpResponse = await _httpClient.SendHttpRequestAsync(httpMethod, uri, httpContent);
103+
var httpResponse = await _httpClient.SendHttpRequestAsync(httpMethod, uri, httpContent, cancellationToken);
99104
var responseContent = httpResponse.Result;
100105

101106
if (!httpResponse.IsSuccessStatusCode)
@@ -121,6 +126,7 @@ private async Task<TResponse> HttpRequest<TResponse>(HttpMethodType httpMethod,
121126
{
122127
_logs.Info($"Http request failed due to expired token, connection id: {_lowLevelClient.ConnectionId}");
123128
await _lowLevelClient.DisconnectAsync();
129+
cancellationToken.ThrowIfCancellationRequested();
124130
}
125131

126132
//StreamTodo: Refactor Token refresh logic. This relies on the fact that connecting fetches fresh token. But we can probably replace the token without breaking the connection
@@ -135,7 +141,7 @@ private async Task<TResponse> HttpRequest<TResponse>(HttpMethodType httpMethod,
135141
while (_lowLevelClient.ConnectionState != ConnectionState.Connected)
136142
{
137143
i++;
138-
await Task.Delay(1);
144+
await Task.Delay(1, cancellationToken);
139145

140146
if (i > maxMsToWait)
141147
{
@@ -152,7 +158,7 @@ private async Task<TResponse> HttpRequest<TResponse>(HttpMethodType httpMethod,
152158
// Recreate the uri to include new connection id
153159
uri = _requestUriFactory.CreateEndpointUri(endpoint, queryParameters);
154160

155-
httpResponse = await _httpClient.SendHttpRequestAsync(httpMethod, uri, httpContent);
161+
httpResponse = await _httpClient.SendHttpRequestAsync(httpMethod, uri, httpContent, cancellationToken);
156162
responseContent = httpResponse.Result;
157163
}
158164

0 commit comments

Comments
 (0)