diff --git a/Packages/StreamVideo/Runtime/Core/Exceptions/StreamCallInProgressException.cs b/Packages/StreamVideo/Runtime/Core/Exceptions/StreamCallInProgressException.cs
new file mode 100644
index 00000000..f2cb0616
--- /dev/null
+++ b/Packages/StreamVideo/Runtime/Core/Exceptions/StreamCallInProgressException.cs
@@ -0,0 +1,15 @@
+using System;
+using StreamVideo.Core.StatefulModels;
+
+namespace StreamVideo.Core.Exceptions
+{
+ ///
+ /// Exception thrown when trying to join a but another is already joined or currently joining
+ ///
+ public class StreamCallInProgressException : Exception
+ {
+ public StreamCallInProgressException(string message) : base(message)
+ {
+ }
+ }
+}
\ No newline at end of file
diff --git a/Packages/StreamVideo/Runtime/Core/Exceptions/StreamCallInProgressException.cs.meta b/Packages/StreamVideo/Runtime/Core/Exceptions/StreamCallInProgressException.cs.meta
new file mode 100644
index 00000000..b928a6ba
--- /dev/null
+++ b/Packages/StreamVideo/Runtime/Core/Exceptions/StreamCallInProgressException.cs.meta
@@ -0,0 +1,3 @@
+fileFormatVersion: 2
+guid: 79ee770970f149d8b651612c496d6cb4
+timeCreated: 1764062143
\ No newline at end of file
diff --git a/Packages/StreamVideo/Runtime/Core/Exceptions/StreamCallNotFoundException.cs b/Packages/StreamVideo/Runtime/Core/Exceptions/StreamCallNotFoundException.cs
new file mode 100644
index 00000000..49b94402
--- /dev/null
+++ b/Packages/StreamVideo/Runtime/Core/Exceptions/StreamCallNotFoundException.cs
@@ -0,0 +1,16 @@
+using System;
+using StreamVideo.Core.StatefulModels;
+
+namespace StreamVideo.Core.Exceptions
+{
+ ///
+ /// Exception thrown when a with provided ID is not found
+ ///
+ public class StreamCallNotFoundException : Exception
+ {
+ public StreamCallNotFoundException(string message) : base(message)
+ {
+
+ }
+ }
+}
\ No newline at end of file
diff --git a/Packages/StreamVideo/Runtime/Core/Exceptions/StreamCallNotFoundException.cs.meta b/Packages/StreamVideo/Runtime/Core/Exceptions/StreamCallNotFoundException.cs.meta
new file mode 100644
index 00000000..6962d317
--- /dev/null
+++ b/Packages/StreamVideo/Runtime/Core/Exceptions/StreamCallNotFoundException.cs.meta
@@ -0,0 +1,3 @@
+fileFormatVersion: 2
+guid: 289d3871066b46ddb644e46fa1b9bc88
+timeCreated: 1764062138
\ No newline at end of file
diff --git a/Packages/StreamVideo/Runtime/Core/IStreamVideoClient.cs b/Packages/StreamVideo/Runtime/Core/IStreamVideoClient.cs
index 9dcaf148..acd052c7 100644
--- a/Packages/StreamVideo/Runtime/Core/IStreamVideoClient.cs
+++ b/Packages/StreamVideo/Runtime/Core/IStreamVideoClient.cs
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
+using System.Threading;
using System.Threading.Tasks;
using StreamVideo.Core.QueryBuilders.Sort.Calls;
using StreamVideo.Core.DeviceManagers;
@@ -67,6 +68,8 @@ public interface IStreamVideoClient : IStreamVideoClientEventsListener, IDisposa
///
/// Credentials required to connect user: api_key, user_id, and user_token
Task ConnectUserAsync(AuthCredentials credentials);
+
+ Task ConnectUserAsync(AuthCredentials credentials, CancellationToken cancellationToken);
///
/// Disconnect user from Stream server.
@@ -76,10 +79,19 @@ public interface IStreamVideoClient : IStreamVideoClientEventsListener, IDisposa
Task JoinCallAsync(StreamCallType callType, string callId, bool create, bool ring,
bool notify);
+ Task JoinCallAsync(StreamCallType callType, string callId, bool create, bool ring,
+ bool notify, CancellationToken cancellationToken);
+
///
- /// Gets call information without joining it. Will return null if the call doesn't exist
+ /// Gets information without joining it. Will return null if the call doesn't exist
///
Task GetCallAsync(StreamCallType callType, string callId);
+
+ ///
+ /// Gets information without joining it. Will return null if the call doesn't exist
+ ///
+ Task GetCallAsync(StreamCallType callType, string callId,
+ CancellationToken cancellationToken);
///
/// Get a call with a specified Type and ID. If such a call doesn't exist, it will be created.
@@ -88,6 +100,14 @@ Task JoinCallAsync(StreamCallType callType, string callId, bool cre
/// Call ID
/// Call object of type:
Task GetOrCreateCallAsync(StreamCallType callType, string callId);
+
+ ///
+ /// Get a call with a specified Type and ID. If such a call doesn't exist, it will be created.
+ ///
+ /// Call type - this defines the permissions and other settings for the call. Read more in the Call Types Docs
+ /// Call ID
+ /// Call object of type:
+ Task GetOrCreateCallAsync(StreamCallType callType, string callId, CancellationToken cancellationToken);
///
/// Query calls
diff --git a/Packages/StreamVideo/Runtime/Core/InternalDTO/Sfu/GeneratedAPI.cs b/Packages/StreamVideo/Runtime/Core/InternalDTO/Sfu/GeneratedAPI.cs
index cf3cbd58..e48610ed 100644
--- a/Packages/StreamVideo/Runtime/Core/InternalDTO/Sfu/GeneratedAPI.cs
+++ b/Packages/StreamVideo/Runtime/Core/InternalDTO/Sfu/GeneratedAPI.cs
@@ -1,3 +1,5 @@
+using System.Threading;
+
namespace StreamVideo.Core.Sfu {
using Signal = StreamVideo.v1.Sfu.Signal;
// Generated by protoc-gen-twirpcs. DO NOT EDIT!
@@ -46,11 +48,12 @@ private static string parseJSONString(string jsonData, string key) {
}
private delegate Resp doParsing(byte[] data) where Resp : IMessage;
- private static async Task DoRequest(HttpClient client, string address, Req req, doParsing parserFunc) where Req : IMessage where Resp : IMessage {
+ private static async Task DoRequest(HttpClient client, string address, Req req, doParsing parserFunc, CancellationToken cancellationToken = default) where Req : IMessage where Resp : IMessage {
using (var content = new ByteArrayContent(req.ToByteArray())) {
content.Headers.ContentType = CONTENT_TYPE_PROTOBUF;
- using (HttpResponseMessage response = await client.PostAsync(address, content)) {
+ using (HttpResponseMessage response = await client.PostAsync(address, content, cancellationToken)) {
var byteArr = await response.Content.ReadAsByteArrayAsync();
+ cancellationToken.ThrowIfCancellationRequested();
if (!response.IsSuccessStatusCode) {
string errorJSON = System.Text.Encoding.UTF8.GetString(byteArr, 0, byteArr.Length);
throw createException(errorJSON);
@@ -61,44 +64,44 @@ private static async Task DoRequest(HttpClient client, string a
}
// SetPublisher sends the WebRTC offer for the peer connection used to publish A/V
- public static async Task SetPublisher(HttpClient client, Signal.SetPublisherRequest req) {
- return await DoRequest(client, "/twirp/stream.video.sfu.signal.SignalServer/SetPublisher", req, Signal.SetPublisherResponse.Parser.ParseFrom);
+ public static async Task SetPublisher(HttpClient client, Signal.SetPublisherRequest req, CancellationToken cancellationToken = default) {
+ return await DoRequest(client, "/twirp/stream.video.sfu.signal.SignalServer/SetPublisher", req, Signal.SetPublisherResponse.Parser.ParseFrom, cancellationToken);
}
// answer is sent by the client to the SFU after receiving a subscriber_offer.
- public static async Task SendAnswer(HttpClient client, Signal.SendAnswerRequest req) {
- return await DoRequest(client, "/twirp/stream.video.sfu.signal.SignalServer/SendAnswer", req, Signal.SendAnswerResponse.Parser.ParseFrom);
+ public static async Task SendAnswer(HttpClient client, Signal.SendAnswerRequest req, CancellationToken cancellationToken = default) {
+ return await DoRequest(client, "/twirp/stream.video.sfu.signal.SignalServer/SendAnswer", req, Signal.SendAnswerResponse.Parser.ParseFrom, cancellationToken);
}
// SendICECandidate sends an ICE candidate to the client
- public static async Task IceTrickle(HttpClient client, StreamVideo.v1.Sfu.Models.ICETrickle req) {
- return await DoRequest(client, "/twirp/stream.video.sfu.signal.SignalServer/IceTrickle", req, StreamVideo.v1.Sfu.Signal.ICETrickleResponse.Parser.ParseFrom);
+ public static async Task IceTrickle(HttpClient client, StreamVideo.v1.Sfu.Models.ICETrickle req, CancellationToken cancellationToken = default) {
+ return await DoRequest(client, "/twirp/stream.video.sfu.signal.SignalServer/IceTrickle", req, StreamVideo.v1.Sfu.Signal.ICETrickleResponse.Parser.ParseFrom, cancellationToken);
}
// UpdateSubscribers is used to notify the SFU about the list of video subscriptions
// TODO: sync subscriptions based on this + update tracks using the dimension info sent by the user
- public static async Task UpdateSubscriptions(HttpClient client, Signal.UpdateSubscriptionsRequest req) {
- return await DoRequest(client, "/twirp/stream.video.sfu.signal.SignalServer/UpdateSubscriptions", req, Signal.UpdateSubscriptionsResponse.Parser.ParseFrom);
+ public static async Task UpdateSubscriptions(HttpClient client, Signal.UpdateSubscriptionsRequest req, CancellationToken cancellationToken = default) {
+ return await DoRequest(client, "/twirp/stream.video.sfu.signal.SignalServer/UpdateSubscriptions", req, Signal.UpdateSubscriptionsResponse.Parser.ParseFrom, cancellationToken);
}
- public static async Task UpdateMuteStates(HttpClient client, Signal.UpdateMuteStatesRequest req) {
- return await DoRequest(client, "/twirp/stream.video.sfu.signal.SignalServer/UpdateMuteStates", req, Signal.UpdateMuteStatesResponse.Parser.ParseFrom);
+ public static async Task UpdateMuteStates(HttpClient client, Signal.UpdateMuteStatesRequest req, CancellationToken cancellationToken = default) {
+ return await DoRequest(client, "/twirp/stream.video.sfu.signal.SignalServer/UpdateMuteStates", req, Signal.UpdateMuteStatesResponse.Parser.ParseFrom, cancellationToken);
}
- public static async Task IceRestart(HttpClient client, Signal.ICERestartRequest req) {
- return await DoRequest(client, "/twirp/stream.video.sfu.signal.SignalServer/IceRestart", req, Signal.ICERestartResponse.Parser.ParseFrom);
+ public static async Task IceRestart(HttpClient client, Signal.ICERestartRequest req, CancellationToken cancellationToken = default) {
+ return await DoRequest(client, "/twirp/stream.video.sfu.signal.SignalServer/IceRestart", req, Signal.ICERestartResponse.Parser.ParseFrom, cancellationToken);
}
- public static async Task SendStats(HttpClient client, Signal.SendStatsRequest req) {
- return await DoRequest(client, "/twirp/stream.video.sfu.signal.SignalServer/SendStats", req, Signal.SendStatsResponse.Parser.ParseFrom);
+ public static async Task SendStats(HttpClient client, Signal.SendStatsRequest req, CancellationToken cancellationToken = default) {
+ return await DoRequest(client, "/twirp/stream.video.sfu.signal.SignalServer/SendStats", req, Signal.SendStatsResponse.Parser.ParseFrom, cancellationToken);
}
- public static async Task StartNoiseCancellation(HttpClient client, Signal.StartNoiseCancellationRequest req) {
- return await DoRequest(client, "/twirp/stream.video.sfu.signal.SignalServer/StartNoiseCancellation", req, Signal.StartNoiseCancellationResponse.Parser.ParseFrom);
+ public static async Task StartNoiseCancellation(HttpClient client, Signal.StartNoiseCancellationRequest req, CancellationToken cancellationToken = default) {
+ return await DoRequest(client, "/twirp/stream.video.sfu.signal.SignalServer/StartNoiseCancellation", req, Signal.StartNoiseCancellationResponse.Parser.ParseFrom, cancellationToken);
}
- public static async Task StopNoiseCancellation(HttpClient client, Signal.StopNoiseCancellationRequest req) {
- return await DoRequest(client, "/twirp/stream.video.sfu.signal.SignalServer/StopNoiseCancellation", req, Signal.StopNoiseCancellationResponse.Parser.ParseFrom);
+ public static async Task StopNoiseCancellation(HttpClient client, Signal.StopNoiseCancellationRequest req, CancellationToken cancellationToken = default) {
+ return await DoRequest(client, "/twirp/stream.video.sfu.signal.SignalServer/StopNoiseCancellation", req, Signal.StopNoiseCancellationResponse.Parser.ParseFrom, cancellationToken);
}
}
}
diff --git a/Packages/StreamVideo/Runtime/Core/LowLevelClient/API/Internal/IInternallVideoClientApi.cs b/Packages/StreamVideo/Runtime/Core/LowLevelClient/API/Internal/IInternallVideoClientApi.cs
index 12f22503..659b8943 100644
--- a/Packages/StreamVideo/Runtime/Core/LowLevelClient/API/Internal/IInternallVideoClientApi.cs
+++ b/Packages/StreamVideo/Runtime/Core/LowLevelClient/API/Internal/IInternallVideoClientApi.cs
@@ -1,3 +1,4 @@
+using System.Threading;
using System.Threading.Tasks;
using StreamVideo.Core.InternalDTO.Models;
using StreamVideo.Core.InternalDTO.Requests;
@@ -8,13 +9,13 @@ namespace StreamVideo.Core.LowLevelClient.API.Internal
internal interface IInternalVideoClientApi
{
Task GetCallAsync(StreamCallType callType, string callId,
- GetOrCreateCallRequestInternalDTO getCallRequest);
+ GetOrCreateCallRequestInternalDTO getCallRequest, CancellationToken cancellationToken);
Task UpdateCallAsync(StreamCallType callType, string callId,
UpdateCallRequestInternalDTO updateCallRequest);
Task GetOrCreateCallAsync(StreamCallType callType, string callId,
- GetOrCreateCallRequestInternalDTO getOrCreateCallRequest);
+ GetOrCreateCallRequestInternalDTO getOrCreateCallRequest, CancellationToken cancellationToken);
Task AcceptCallAsync(StreamCallType callType, string callId);
diff --git a/Packages/StreamVideo/Runtime/Core/LowLevelClient/API/Internal/InternalApiClientBase.cs b/Packages/StreamVideo/Runtime/Core/LowLevelClient/API/Internal/InternalApiClientBase.cs
index db33b132..fbb83bd7 100644
--- a/Packages/StreamVideo/Runtime/Core/LowLevelClient/API/Internal/InternalApiClientBase.cs
+++ b/Packages/StreamVideo/Runtime/Core/LowLevelClient/API/Internal/InternalApiClientBase.cs
@@ -1,5 +1,6 @@
using System;
using System.Text;
+using System.Threading;
using System.Threading.Tasks;
using StreamVideo.Core.Exceptions;
using StreamVideo.Core.InternalDTO.Models;
@@ -25,14 +26,18 @@ protected InternalApiClientBase(IHttpClient httpClient, ISerializer serializer,
_lowLevelClient = lowLevelClient ?? throw new ArgumentNullException(nameof(lowLevelClient));
}
- protected Task Get(string endpoint, TPayload payload)
- => HttpRequest(HttpMethodType.Get, endpoint, payload);
+ //StreamTODO: add cancellation token support to all
+
+ protected Task Get(string endpoint, TPayload payload,
+ CancellationToken cancellationToken)
+ => HttpRequest(HttpMethodType.Get, endpoint, payload, cancellationToken: cancellationToken);
protected Task Get(string endpoint, QueryParameters parameters = null)
=> HttpRequest(HttpMethodType.Get, endpoint, queryParameters: parameters);
- protected Task Post(string endpoint, TRequest request = default)
- => HttpRequest(HttpMethodType.Post, endpoint, request);
+ protected Task Post(string endpoint, TRequest request = default,
+ CancellationToken cancellationToken = default)
+ => HttpRequest(HttpMethodType.Post, endpoint, request, cancellationToken: cancellationToken);
protected Task Post(string endpoint, object request = null)
=> HttpRequest(HttpMethodType.Post, endpoint, request);
@@ -74,7 +79,7 @@ private object TrySerializeRequestBodyContent(object content, out string seriali
}
private async Task HttpRequest(HttpMethodType httpMethod, string endpoint,
- object requestBody = default, QueryParameters queryParameters = null)
+ object requestBody = default, QueryParameters queryParameters = null, CancellationToken cancellationToken = default)
{
// //StreamTodo: perhaps remove this requirement, sometimes we send empty body without any properties
// if (requestBody == null && IsRequestBodyRequiredByHttpMethod(httpMethod))
@@ -95,7 +100,7 @@ private async Task HttpRequest(HttpMethodType httpMethod,
LogFutureRequestIfDebug(uri, endpoint, httpMethod, logContent);
- var httpResponse = await _httpClient.SendHttpRequestAsync(httpMethod, uri, httpContent);
+ var httpResponse = await _httpClient.SendHttpRequestAsync(httpMethod, uri, httpContent, cancellationToken);
var responseContent = httpResponse.Result;
if (!httpResponse.IsSuccessStatusCode)
@@ -121,6 +126,7 @@ private async Task HttpRequest(HttpMethodType httpMethod,
{
_logs.Info($"Http request failed due to expired token, connection id: {_lowLevelClient.ConnectionId}");
await _lowLevelClient.DisconnectAsync();
+ cancellationToken.ThrowIfCancellationRequested();
}
//StreamTodo: Refactor Token refresh logic. This relies on the fact that connecting fetches fresh token. But we can probably replace the token without breaking the connection
@@ -135,7 +141,7 @@ private async Task HttpRequest(HttpMethodType httpMethod,
while (_lowLevelClient.ConnectionState != ConnectionState.Connected)
{
i++;
- await Task.Delay(1);
+ await Task.Delay(1, cancellationToken);
if (i > maxMsToWait)
{
@@ -152,7 +158,7 @@ private async Task HttpRequest(HttpMethodType httpMethod,
// Recreate the uri to include new connection id
uri = _requestUriFactory.CreateEndpointUri(endpoint, queryParameters);
- httpResponse = await _httpClient.SendHttpRequestAsync(httpMethod, uri, httpContent);
+ httpResponse = await _httpClient.SendHttpRequestAsync(httpMethod, uri, httpContent, cancellationToken);
responseContent = httpResponse.Result;
}
diff --git a/Packages/StreamVideo/Runtime/Core/LowLevelClient/API/Internal/InternalVideoClientApi.cs b/Packages/StreamVideo/Runtime/Core/LowLevelClient/API/Internal/InternalVideoClientApi.cs
index f8cb69e9..39e9458e 100644
--- a/Packages/StreamVideo/Runtime/Core/LowLevelClient/API/Internal/InternalVideoClientApi.cs
+++ b/Packages/StreamVideo/Runtime/Core/LowLevelClient/API/Internal/InternalVideoClientApi.cs
@@ -1,4 +1,5 @@
-using System.Threading.Tasks;
+using System.Threading;
+using System.Threading.Tasks;
using StreamVideo.Core.InternalDTO.Models;
using StreamVideo.Core.InternalDTO.Requests;
using StreamVideo.Core.InternalDTO.Responses;
@@ -20,17 +21,17 @@ public InternalVideoClientApi(IHttpClient httpClient, ISerializer serializer, IL
}
public Task GetCallAsync(StreamCallType callType, string callId,
- GetOrCreateCallRequestInternalDTO getCallRequest)
- => Get($"/call/{callType}/{callId}", getCallRequest);
+ GetOrCreateCallRequestInternalDTO getCallRequest, CancellationToken cancellationToken)
+ => Get($"/call/{callType}/{callId}", getCallRequest, cancellationToken);
public Task UpdateCallAsync(StreamCallType callType, string callId,
UpdateCallRequestInternalDTO updateCallRequest)
=> Patch($"/call/{callType}/{callId}", updateCallRequest);
public Task GetOrCreateCallAsync(StreamCallType callType, string callId,
- GetOrCreateCallRequestInternalDTO getOrCreateCallRequest)
+ GetOrCreateCallRequestInternalDTO getOrCreateCallRequest, CancellationToken cancellationToken)
=> Post($"/call/{callType}/{callId}",
- getOrCreateCallRequest);
+ getOrCreateCallRequest, cancellationToken);
public Task AcceptCallAsync(StreamCallType callType, string callId)
=> Post($"/call/{callType}/{callId}/accept");
diff --git a/Packages/StreamVideo/Runtime/Core/LowLevelClient/IStreamVideoLowLevelClient.cs b/Packages/StreamVideo/Runtime/Core/LowLevelClient/IStreamVideoLowLevelClient.cs
index 197d096f..7950b98c 100644
--- a/Packages/StreamVideo/Runtime/Core/LowLevelClient/IStreamVideoLowLevelClient.cs
+++ b/Packages/StreamVideo/Runtime/Core/LowLevelClient/IStreamVideoLowLevelClient.cs
@@ -57,6 +57,6 @@ Task ConnectUserAsync(string apiKey, string userId, string userToken,
Task ConnectUserAsync(string apiKey, string userId, ITokenProvider tokenProvider,
CancellationToken cancellationToken = default);
- Task GetLocationHintAsync();
+ Task GetLocationHintAsync(CancellationToken cancellationToken);
}
}
\ No newline at end of file
diff --git a/Packages/StreamVideo/Runtime/Core/LowLevelClient/ReconnectScheduler.cs b/Packages/StreamVideo/Runtime/Core/LowLevelClient/ReconnectScheduler.cs
index f0be759d..a812779c 100644
--- a/Packages/StreamVideo/Runtime/Core/LowLevelClient/ReconnectScheduler.cs
+++ b/Packages/StreamVideo/Runtime/Core/LowLevelClient/ReconnectScheduler.cs
@@ -8,6 +8,8 @@ internal interface IReconnectScheduler
{
double? NextReconnectTime { get; }
event Action ReconnectionScheduled;
+
+ void Reset();
}
///
/// Schedules next reconnection time based on the past attempts and network availability
@@ -43,8 +45,9 @@ private set
}
public ReconnectScheduler(ITimeService timeService, IStreamVideoLowLevelClient lowLevelClient,
- INetworkMonitor networkMonitor)
+ INetworkMonitor networkMonitor, Func shouldReconnect)
{
+ _shouldReconnect = shouldReconnect ?? throw new ArgumentNullException(nameof(shouldReconnect));
_client = lowLevelClient ?? throw new ArgumentNullException(nameof(lowLevelClient));
_timeService = timeService ?? throw new ArgumentNullException(nameof(timeService));
_networkMonitor = networkMonitor ?? throw new ArgumentNullException(nameof(networkMonitor));
@@ -98,6 +101,12 @@ void ThrowIfLessOrEqualToZero(float value, string name)
}
}
+ public void Reset()
+ {
+ NextReconnectTime = default;
+ _reconnectAttempts = 0;
+ }
+
public void Stop()
{
NextReconnectTime = float.MaxValue;
@@ -112,6 +121,7 @@ public void Stop()
private int _reconnectAttempts;
private bool _isStopped;
private double? _nextReconnectTime;
+ private Func _shouldReconnect;
private void TryScheduleNextReconnectTime()
{
@@ -120,7 +130,7 @@ private void TryScheduleNextReconnectTime()
return;
}
- if (_isStopped || ReconnectStrategy == ReconnectStrategy.Never)
+ if (_isStopped || ReconnectStrategy == ReconnectStrategy.Never || !_shouldReconnect())
{
return;
}
@@ -151,6 +161,7 @@ private void TryScheduleNextReconnectTime()
}
NextReconnectTime = GetNextReconnectTime();
+ _reconnectAttempts++;
}
private void OnConnectionStateChanged(ConnectionState previous, ConnectionState current)
diff --git a/Packages/StreamVideo/Runtime/Core/LowLevelClient/RtcSession.cs b/Packages/StreamVideo/Runtime/Core/LowLevelClient/RtcSession.cs
index 276d8451..68509a7e 100644
--- a/Packages/StreamVideo/Runtime/Core/LowLevelClient/RtcSession.cs
+++ b/Packages/StreamVideo/Runtime/Core/LowLevelClient/RtcSession.cs
@@ -7,6 +7,7 @@
using System.Net.Http;
using System.Net.WebSockets;
using System.Text.RegularExpressions;
+using System.Threading;
using System.Threading.Tasks;
using StreamVideo.v1.Sfu.Events;
using StreamVideo.v1.Sfu.Models;
@@ -84,6 +85,7 @@ internal sealed class RtcSession : IMediaInputProvider, IDisposable
public event Action PublisherAudioTrackChanged;
public event Action PublisherVideoTrackChanged;
+ public event Action PeerConnectionDisconnectedDuringSession;
public bool PublisherAudioTrackIsEnabled
{
@@ -212,6 +214,16 @@ public Camera VideoSceneInput
}
#endregion
+
+ public bool ShouldSfuAttemptToReconnect()
+ {
+ if (CallState != CallingState.Joined && CallState != CallingState.Joining)
+ {
+ return false;
+ }
+
+ return !GetCurrentCancellationTokenOrDefault().IsCancellationRequested;
+ }
public string SessionId { get; private set; } = "(empty)";
@@ -270,10 +282,11 @@ public void Update()
TryExecuteSubscribeToTracks();
}
- public async Task SendWebRtcStats(SendStatsRequest request)
+ public async Task SendWebRtcStats(SendStatsRequest request, CancellationToken cancellationToken)
{
var response = await RpcCallAsync(request, GeneratedAPI.SendStats,
- nameof(GeneratedAPI.SendStats), response => response.Error, postLog: LogWebRTCStats);
+ nameof(GeneratedAPI.SendStats), cancellationToken, response => response.Error,
+ postLog: LogWebRTCStats);
if (ActiveCall == null)
{
@@ -298,7 +311,40 @@ public async Task SendWebRtcStats(SendStatsRequest request)
//StreamTodo: solve this dependency better
public void SetCache(ICache cache) => _cache = cache;
- public async Task StartAsync(StreamCall call)
+ private void ValidateCallCredentialsOrThrow(IStreamCall call)
+ {
+ if (call.Credentials == null)
+ {
+ throw new ArgumentNullException(nameof(call.Credentials));
+ }
+
+ if (call.Credentials.Server == null)
+ {
+ throw new ArgumentNullException(nameof(call.Credentials.Server));
+ }
+
+ if (string.IsNullOrEmpty(call.Credentials.Server.Url))
+ {
+ throw new ArgumentNullException(nameof(call.Credentials.Server.Url));
+ }
+
+ if (string.IsNullOrEmpty(call.Credentials.Token))
+ {
+ throw new ArgumentNullException(nameof(call.Credentials.Token));
+ }
+
+ if (call.Credentials.IceServers == null)
+ {
+ throw new ArgumentNullException(nameof(call.Credentials.IceServers));
+ }
+
+ if (call.Credentials.IceServers.Count == 0)
+ {
+ throw new ArgumentException("At least one ICE server must be provided in call credentials.");
+ }
+ }
+
+ public async Task StartAsync(StreamCall call, CancellationToken cancellationToken = default)
{
if (ActiveCall != null)
{
@@ -306,18 +352,45 @@ public async Task StartAsync(StreamCall call)
$"Cannot start new session until previous call is active. Active call: {ActiveCall}");
}
+ if (call == null)
+ {
+ throw new ArgumentNullException(nameof(call));
+ }
+
try
{
+ _logs.Info($"Start joining a call: type={call.Type}, id={call.Id}");
+
//StreamTodo: perhaps not necessary here
ClearSession();
+ if (_joinCallCts != null)
+ {
+ _logs.ErrorIfDebug("Previous join call CTS was not cleaned up properly. Cancelling it now.");
+ _joinCallCts.Cancel();
+ _joinCallCts.Dispose();
+ _joinCallCts = null;
+ }
+
+ if (_activeCallCts != null)
+ {
+ _logs.ErrorIfDebug("Previous active call CTS was not cleaned up properly. Cancelling it now.");
+ _activeCallCts.Cancel();
+ _activeCallCts.Dispose();
+ _activeCallCts = null;
+ }
+
+ _joinCallCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
+
SubscribeToSfuEvents();
- ActiveCall = call ?? throw new ArgumentNullException(nameof(call));
+ ActiveCall = call;
_httpClient = _httpClientFactory(ActiveCall);
CallState = CallingState.Joining;
+ ValidateCallCredentialsOrThrow(ActiveCall);
+
var sfuUrl = call.Credentials.Server.Url;
var sfuToken = call.Credentials.Token;
var iceServers = call.Credentials.IceServers;
@@ -339,16 +412,30 @@ public async Task StartAsync(StreamCall call)
#endif
// We don't set initial offer as local. Later on we set generated answer as a local
- var offer = await Subscriber.CreateOfferAsync();
+ var offer = await Subscriber.CreateOfferAsync(_joinCallCts.Token);
+
+ if (string.IsNullOrEmpty(offer.sdp))
+ {
+ throw new ArgumentException("Generated offer SDP is null or empty");
+ }
_sfuWebSocket.SetSessionData(SessionId, offer.sdp, sfuUrl, sfuToken);
- await _sfuWebSocket.ConnectAsync();
+ await _sfuWebSocket.ConnectAsync(cancellationToken);
+ // Wait for call to be joined with timeout
+ const int joinTimeoutSeconds = 30;
+ var joinStartTime = _timeService.Time;
while (CallState != CallingState.Joined)
{
- //StreamTodo: implement a timeout if something goes wrong
- //StreamTodo: implement cancellation token
- await Task.Delay(1);
+ await Task.Delay(1, cancellationToken);
+ cancellationToken.ThrowIfCancellationRequested();
+
+ var elapsedTime = _timeService.Time - joinStartTime;
+ if (elapsedTime > joinTimeoutSeconds)
+ {
+ throw new TimeoutException(
+ $"Failed to join call within {joinTimeoutSeconds} seconds. Current state: {CallState}");
+ }
}
// Wait for SFU connected to receive track prefix
@@ -357,7 +444,7 @@ public async Task StartAsync(StreamCall call)
CreatePublisher(iceServers);
}
- await SubscribeToTracksAsync();
+ await SubscribeToTracksAsync(cancellationToken);
if (UseNativeAudioBindings)
{
@@ -370,36 +457,71 @@ public async Task StartAsync(StreamCall call)
//StreamTodo: validate when this state should set
CallState = CallingState.Joined;
+ _activeCallCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
+
+ _logs.Info($"Joined call: type={call.Type}, id={call.Id}");
+
#if STREAM_DEBUG_ENABLED
_videoAudioSyncBenchmark?.Init(call);
#endif
}
- catch
+ catch(OperationCanceledException)
+ {
+ ClearSession();
+ throw;
+ }
+ catch(Exception e)
{
+ _logs.Exception(e);
ClearSession();
throw;
}
+ finally
+ {
+ if (_joinCallCts != null)
+ {
+ _joinCallCts.Dispose();
+ _joinCallCts = null;
+ }
+ }
}
public async Task StopAsync(string reason = "")
{
+ if (UseNativeAudioBindings)
+ {
+#if STREAM_NATIVE_AUDIO
+ WebRTC.StopAudioPlayback();
+#endif
+ }
+
if (CallState == CallingState.Leaving || CallState == CallingState.Offline)
{
+ //StreamTODO: should this return a task of the ongoing stop?
return;
}
+ if (CallState != CallingState.Joined && CallState != CallingState.Joining)
+ {
+ throw new InvalidOperationException(
+ "Tried to leave call that is not joined or joining. Current state: " + CallState);
+ }
+
CallState = CallingState.Leaving;
+ if (_joinCallCts != null)
+ {
+ _joinCallCts.Cancel();
+ }
- if (UseNativeAudioBindings)
+ if (_activeCallCts != null)
{
-#if STREAM_NATIVE_AUDIO
- WebRTC.StopAudioPlayback();
-#endif
+ _activeCallCts.Cancel();
}
if (ActiveCall != null)
{
+ _logs.Info("Leaving call...");
try
{
// Trace leave call before leaving the call. Otherwise, stats are not send because SFU WS disconnects
@@ -407,9 +529,11 @@ public async Task StopAsync(string reason = "")
if (_statsSender != null) // This was null in tests
{
+ var sendStatsCancellationToken = new CancellationTokenSource();
+ sendStatsCancellationToken.CancelAfter(400);
using (new TimeLogScope("Sending final stats on leave", _logs.Info))
{
- await _statsSender.SendFinalStatsAsync();
+ await _statsSender.SendFinalStatsAsync(sendStatsCancellationToken.Token);
}
}
}
@@ -417,24 +541,16 @@ public async Task StopAsync(string reason = "")
{
_logs.Warning($"Failed to send final stats on leave: {e.Message}");
}
-
-#if STREAM_DEBUG_ENABLED
- if (_sfuWebSocket.SendQueueCount > 0)
- {
- _logs.Error(
- $"Waited for 300+ ms for SFU messages to be sent. Remaining: {_sfuWebSocket.SendQueueCount}");
- }
-#endif
}
ClearSession();
- //StreamTodo: check with js definition of "offline"
using (new TimeLogScope("Sending leave call request & disconnect", _logs.Info))
{
await _sfuWebSocket.DisconnectAsync(WebSocketCloseStatus.NormalClosure, reason);
}
+ //StreamTodo: check with js definition of "offline"
CallState = CallingState.Offline;
#if STREAM_DEBUG_ENABLED
@@ -561,6 +677,9 @@ private readonly Dictionary _videoResolutionByParticipa
private MicrophoneDeviceInfo _activeAudioRecordingDevice;
+ private CancellationTokenSource _joinCallCts;
+ private CancellationTokenSource _activeCallCts;
+
private void ClearSession()
{
UnsubscribeFromSfuEvents();
@@ -578,10 +697,32 @@ private void ClearSession()
CallState = CallingState.Unknown;
_httpClient = null;
+ if (_joinCallCts != null)
+ {
+ _joinCallCts.Dispose();
+ _joinCallCts = null;
+ }
+
+ if (_activeCallCts != null)
+ {
+ _activeCallCts.Dispose();
+ _activeCallCts = null;
+ }
+
_trackSubscriptionRequested = false;
_trackSubscriptionRequestInProgress = false;
}
+ private CancellationToken GetCurrentCancellationTokenOrDefault()
+ {
+ if (_activeCallCts != null)
+ {
+ return _activeCallCts.Token;
+ }
+
+ return _joinCallCts?.Token ?? default;
+ }
+
//StreamTodo: request track subscriptions when SFU got changed. Android comment for setVideoSubscriptions:
/*
* - it sends the resolutions we're displaying the video at so the SFU can decide which track to send
@@ -615,7 +756,16 @@ private void TryExecuteSubscribeToTracks()
return;
}
- SubscribeToTracksAsync().LogIfFailed();
+ //StreamTODO: add cancellation token support
+ SubscribeToTracksAsync(GetCurrentCancellationTokenOrDefault()).ContinueWith(t =>
+ {
+ if (ActiveCall == null)
+ {
+ return;
+ }
+
+ t.LogIfFailed();
+ });
_lastTrackSubscriptionRequestTime = _timeService.Time;
_trackSubscriptionRequested = false;
@@ -624,7 +774,8 @@ private void TryExecuteSubscribeToTracks()
///
/// Request this via . We don't want to call it too often
///
- private async Task SubscribeToTracksAsync()
+ ///
+ private async Task SubscribeToTracksAsync(CancellationToken cancellationToken)
{
if (ActiveCall?.Participants == null || !ActiveCall.Participants.Any())
{
@@ -658,7 +809,7 @@ private async Task SubscribeToTracksAsync()
#endif
var response = await RpcCallAsync(request, GeneratedAPI.UpdateSubscriptions,
- nameof(GeneratedAPI.UpdateSubscriptions), response => response.Error);
+ nameof(GeneratedAPI.UpdateSubscriptions), cancellationToken, response => response.Error);
if (ActiveCall == null)
{
@@ -768,8 +919,9 @@ private async Task SendIceCandidateAsync(RTCIceCandidate candidate, StreamPeerTy
if (_callState == CallingState.Joined)
{
+ var cancellationToken = GetCurrentCancellationTokenOrDefault();
await RpcCallAsync(iceTrickle, GeneratedAPI.IceTrickle, nameof(GeneratedAPI.IceTrickle),
- response => response.Error);
+ cancellationToken, response => response.Error);
}
else
{
@@ -810,6 +962,8 @@ private void UpdateAudioRecording()
private void OnSfuJoinResponse(JoinResponse joinResponse)
{
+ //StreamTODO: what if left the call and started a new one but the JoinResponse belongs to the previous session?
+
_sfuTracer?.Trace(PeerConnectionTraceKey.JoinRequest, joinResponse);
_logs.InfoIfDebug($"Handle Sfu {nameof(JoinResponse)}");
ActiveCall.UpdateFromSfu(joinResponse);
@@ -820,10 +974,16 @@ private void OnSfuJoinedCall()
{
CallState = CallingState.Joined;
+ var cancellationToken = GetCurrentCancellationTokenOrDefault();
foreach (var iceTrickle in _pendingIceTrickleRequests)
{
+ if (cancellationToken.IsCancellationRequested)
+ {
+ return;
+ }
+
RpcCallAsync(iceTrickle, GeneratedAPI.IceTrickle, nameof(GeneratedAPI.IceTrickle),
- response => response.Error).LogIfFailed();
+ cancellationToken, response => response.Error).LogIfFailed();
}
}
@@ -865,6 +1025,11 @@ private async void OnSfuSubscriberOffer(SubscriberOffer subscriberOffer)
try
{
+ if (GetCurrentCancellationTokenOrDefault().IsCancellationRequested)
+ {
+ return;
+ }
+
//StreamTodo: handle subscriberOffer.iceRestart
var rtcSessionDescription = new RTCSessionDescription
{
@@ -874,7 +1039,7 @@ private async void OnSfuSubscriberOffer(SubscriberOffer subscriberOffer)
try
{
- await Subscriber.SetRemoteDescriptionAsync(rtcSessionDescription);
+ await Subscriber.SetRemoteDescriptionAsync(rtcSessionDescription, GetCurrentCancellationTokenOrDefault());
Subscriber.ThrowDisposedDuringOperationIfNull();
}
catch (Exception e)
@@ -884,14 +1049,14 @@ private async void OnSfuSubscriberOffer(SubscriberOffer subscriberOffer)
throw;
}
- var answer = await Subscriber.CreateAnswerAsync();
+ var answer = await Subscriber.CreateAnswerAsync(GetCurrentCancellationTokenOrDefault());
Subscriber.ThrowDisposedDuringOperationIfNull();
//StreamTodo: mangle SDP
try
{
- await Subscriber.SetLocalDescriptionAsync(ref answer);
+ await Subscriber.SetLocalDescriptionAsync(ref answer, GetCurrentCancellationTokenOrDefault());
Subscriber.ThrowDisposedDuringOperationIfNull();
}
catch (Exception e)
@@ -909,11 +1074,15 @@ private async void OnSfuSubscriberOffer(SubscriberOffer subscriberOffer)
};
await RpcCallAsync(sendAnswerRequest, GeneratedAPI.SendAnswer, nameof(GeneratedAPI.SendAnswer),
- response => response.Error,
- preLog: true);
+ GetCurrentCancellationTokenOrDefault(), response => response.Error, preLog: true);
+ }
+ catch (OperationCanceledException)
+ {
+ // Expected during shutdown, don't log as error
}
catch (DisposedDuringOperationException)
{
+ // Expected during shutdown
}
catch (Exception e)
{
@@ -930,11 +1099,14 @@ private void OnSfuTrackUnpublished(TrackUnpublished trackUnpublished)
var sessionId = trackUnpublished.SessionId;
var type = trackUnpublished.Type.ToPublicEnum();
var cause = trackUnpublished.Cause;
+
+ // StreamTODO: test if this works well with other user muting this user
+ var updateLocalParticipantState = cause != TrackUnpublishReason.Unspecified && cause != TrackUnpublishReason.UserMuted;
// Optionally available. Read TrackUnpublished.participant comment in events.proto
var participantSfuDto = trackUnpublished.Participant;
- UpdateParticipantTracksState(userId, sessionId, type, isEnabled: false, out var participant);
+ UpdateParticipantTracksState(userId, sessionId, type, isEnabled: false, updateLocalParticipantState, out var participant);
if (participantSfuDto != null && participant != null)
{
@@ -955,7 +1127,7 @@ private void OnSfuTrackPublished(TrackPublished trackPublished)
// Optionally available. Read TrackUnpublished.participant comment in events.proto
var participantSfuDto = trackPublished.Participant;
- UpdateParticipantTracksState(userId, sessionId, type, isEnabled: true, out var participant);
+ UpdateParticipantTracksState(userId, sessionId, type, isEnabled: true, updateLocalParticipantState: true, out var participant);
if (participantSfuDto != null && participant != null)
{
@@ -968,7 +1140,7 @@ private void OnSfuTrackPublished(TrackPublished trackPublished)
}
private void UpdateParticipantTracksState(string userId, string sessionId, TrackType trackType, bool isEnabled,
- out StreamVideoCallParticipant participant)
+ bool updateLocalParticipantState, out StreamVideoCallParticipant participant)
{
participant = (StreamVideoCallParticipant)ActiveCall.Participants.FirstOrDefault(p
=> p.SessionId == sessionId);
@@ -978,7 +1150,7 @@ private void UpdateParticipantTracksState(string userId, string sessionId, Track
return;
}
- if (participant.IsLocalParticipant)
+ if (participant.IsLocalParticipant && updateLocalParticipantState)
{
//StreamTODO: most probably expose RtcSession TrackStateChanged event so that AudioDeviceManager can subscribe
@@ -1015,18 +1187,20 @@ private async Task UpdateMuteStateAsync(TrackType trackType, bool isEnabled)
return;
}
+ var cancellationToken = _joinCallCts?.Token ?? default;
await RpcCallAsync(new UpdateMuteStatesRequest
- {
- SessionId = SessionId,
- MuteStates =
{
- new TrackMuteState
+ SessionId = SessionId,
+ MuteStates =
{
- TrackType = trackType.ToInternalEnum(),
- Muted = !isEnabled
+ new TrackMuteState
+ {
+ TrackType = trackType.ToInternalEnum(),
+ Muted = !isEnabled
+ }
}
- }
- }, GeneratedAPI.UpdateMuteStates, nameof(GeneratedAPI.UpdateSubscriptions), response => response.Error);
+ }, GeneratedAPI.UpdateMuteStates, nameof(GeneratedAPI.UpdateSubscriptions), cancellationToken,
+ response => response.Error);
}
private void InternalExecuteSetPublisherAudioTrackEnabled(bool isEnabled)
@@ -1205,8 +1379,9 @@ private void OnSfuWebSocketOnCallEnded()
//StreamTodo: implement retry strategy like in Android SDK
//If possible, take into account if we the update is still valid e.g.
private async Task RpcCallAsync(TRequest request,
- Func> rpcCallAsync, string debugRequestName,
- Func getError, bool preLog = false,
+ Func> rpcCallAsync, string debugRequestName,
+ CancellationToken cancellationToken, Func getError,
+ bool preLog = false,
bool postLog = true)
{
//StreamTodo: use rpcCallAsync.GetMethodInfo().Name; instead debugRequestName
@@ -1236,7 +1411,8 @@ var errorMsg
}
#endif
- var response = await rpcCallAsync(_httpClient, request);
+ // StreamTODO: Add multiple retries
+ var response = await rpcCallAsync(_httpClient, request, cancellationToken);
if (!skipTracing)
{
@@ -1322,7 +1498,12 @@ private async void OnPublisherNegotiationNeeded()
$"{nameof(Publisher.SignalingState)} state is not stable, current state: {Publisher.SignalingState}");
}
- var offer = await Publisher.CreateOfferAsync();
+ if(GetCurrentCancellationTokenOrDefault().IsCancellationRequested)
+ {
+ return;
+ }
+
+ var offer = await Publisher.CreateOfferAsync(GetCurrentCancellationTokenOrDefault());
Publisher.ThrowDisposedDuringOperationIfNull();
//StreamTOodo: check if SDP is null or empty (this would throw an exception during setting)
@@ -1344,7 +1525,7 @@ private async void OnPublisherNegotiationNeeded()
try
{
- await Publisher.SetLocalDescriptionAsync(ref offer);
+ await Publisher.SetLocalDescriptionAsync(ref offer, GetCurrentCancellationTokenOrDefault());
Publisher.ThrowDisposedDuringOperationIfNull();
}
catch (Exception e)
@@ -1383,8 +1564,9 @@ private async void OnPublisherNegotiationNeeded()
_logs.Warning($"SetPublisherRequest:\n{serializedRequest}");
#endif
+ //StreamTODO: add cancellation token support
var result = await RpcCallAsync(request, GeneratedAPI.SetPublisher, nameof(GeneratedAPI.SetPublisher),
- response => response.Error);
+ GetCurrentCancellationTokenOrDefault(), response => response.Error);
Publisher.ThrowDisposedDuringOperationIfNull();
#if STREAM_DEBUG_ENABLED
@@ -1397,7 +1579,7 @@ await Publisher.SetRemoteDescriptionAsync(new RTCSessionDescription()
{
type = RTCSdpType.Answer,
sdp = result.Sdp
- });
+ }, GetCurrentCancellationTokenOrDefault());
}
catch (Exception e)
{
@@ -1406,8 +1588,13 @@ await Publisher.SetRemoteDescriptionAsync(new RTCSessionDescription()
throw;
}
}
+ catch (OperationCanceledException)
+ {
+ // Expected during shutdown, don't log as error
+ }
catch (DisposedDuringOperationException)
{
+ // Expected during shutdown
}
catch (Exception e)
{
@@ -1666,6 +1853,7 @@ private void CreatePublisher(IEnumerable iceServers)
Publisher.NegotiationNeeded += OnPublisherNegotiationNeeded;
Publisher.PublisherAudioTrackChanged += OnPublisherAudioTrackChanged;
Publisher.PublisherVideoTrackChanged += OnPublisherVideoTrackChanged;
+ Publisher.Disconnected += PublisherOnDisconnected;
Publisher.InitPublisherTracks();
@@ -1681,6 +1869,7 @@ private void DisposePublisher()
Publisher.NegotiationNeeded -= OnPublisherNegotiationNeeded;
Publisher.PublisherAudioTrackChanged -= OnPublisherAudioTrackChanged;
Publisher.PublisherVideoTrackChanged -= OnPublisherVideoTrackChanged;
+ Publisher.Disconnected += PublisherOnDisconnected;
Publisher.Dispose();
Publisher = null;
}
@@ -1696,6 +1885,14 @@ private void OnPublisherVideoTrackChanged(VideoStreamTrack videoTrack)
{
PublisherVideoTrackChanged?.Invoke();
}
+
+ void PublisherOnDisconnected()
+ {
+ if (CallState == CallingState.Joined || CallState == CallingState.Joining)
+ {
+ PeerConnectionDisconnectedDuringSession?.Invoke();
+ }
+ }
private void OnSfuWebSocketDisconnected()
{
@@ -1715,6 +1912,7 @@ private static bool AssertCallIdMatch(IStreamCall activeCall, string callId, ILo
return true;
}
+
private void SubscribeToSfuEvents()
{
diff --git a/Packages/StreamVideo/Runtime/Core/LowLevelClient/StreamPeerConnection.cs b/Packages/StreamVideo/Runtime/Core/LowLevelClient/StreamPeerConnection.cs
index 98967ca8..e2c1aa66 100644
--- a/Packages/StreamVideo/Runtime/Core/LowLevelClient/StreamPeerConnection.cs
+++ b/Packages/StreamVideo/Runtime/Core/LowLevelClient/StreamPeerConnection.cs
@@ -6,6 +6,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
+using System.Threading;
using System.Threading.Tasks;
using StreamVideo.Core.Configs;
using StreamVideo.Core.Models;
@@ -30,6 +31,8 @@ internal class StreamPeerConnection : IDisposable
public event Action PublisherVideoTrackChanged;
public event Action PublisherAudioTrackChanged;
+ public event Action Disconnected;
+
public bool IsRemoteDescriptionAvailable
{
get
@@ -153,20 +156,21 @@ public void InitPublisherTracks()
public void RestartIce() => _peerConnection.RestartIce();
- public Task SetLocalDescriptionAsync(ref RTCSessionDescription offer)
+ public Task SetLocalDescriptionAsync(ref RTCSessionDescription offer,
+ CancellationToken cancellationToken)
{
#if STREAM_DEBUG_ENABLED
_logs.Warning($"[{_peerType}] Set LocalDesc:\n" + offer.sdp);
#endif
_tracer?.Trace(PeerConnectionTraceKey.SetLocalDescription, offer.sdp);
- return _peerConnection.SetLocalDescriptionAsync(ref offer);
+ return _peerConnection.SetLocalDescriptionAsync(ref offer, cancellationToken);
}
- public async Task SetRemoteDescriptionAsync(RTCSessionDescription offer)
+ public async Task SetRemoteDescriptionAsync(RTCSessionDescription offer, CancellationToken cancellationToken)
{
_tracer?.Trace(PeerConnectionTraceKey.SetRemoteDescription, offer.sdp);
- await _peerConnection.SetRemoteDescriptionAsync(ref offer);
+ await _peerConnection.SetRemoteDescriptionAsync(ref offer, cancellationToken);
#if STREAM_DEBUG_ENABLED
_logs.Warning(
@@ -200,16 +204,16 @@ public void AddIceCandidate(RTCIceCandidateInit iceCandidateInit)
_peerConnection.AddIceCandidate(iceCandidate);
}
- public async Task CreateOfferAsync()
+ public async Task CreateOfferAsync(CancellationToken cancellationToken)
{
- var offer = await _peerConnection.CreateOfferAsync();
+ var offer = await _peerConnection.CreateOfferAsync(cancellationToken);
_tracer?.Trace(PeerConnectionTraceKey.CreateOffer, offer.sdp);
return offer;
}
- public async Task CreateAnswerAsync()
+ public async Task CreateAnswerAsync(CancellationToken cancellationToken)
{
- var answer = await _peerConnection.CreateAnswerAsync();
+ var answer = await _peerConnection.CreateAnswerAsync(cancellationToken);
_tracer?.Trace(PeerConnectionTraceKey.CreateAnswer, answer.sdp);
return answer;
}
@@ -225,7 +229,7 @@ public void Update()
}
}
- public Task GetStatsReportAsync() => _peerConnection.GetStatsAsync();
+ public Task GetStatsReportAsync(CancellationToken cancellationToken) => _peerConnection.GetStatsAsync(cancellationToken);
public PublisherVideoSettings GetLatestVideoSettings()
{
@@ -401,6 +405,11 @@ private void OnConnectionStateChange(RTCPeerConnectionState state)
_logs.Warning($"[{_peerType}] OnConnectionStateChange to: {state}");
#endif
_tracer?.Trace(PeerConnectionTraceKey.OnConnectionStateChange, state.ToString());
+
+ if (state == RTCPeerConnectionState.Disconnected)
+ {
+ Disconnected?.Invoke();
+ }
}
private void OnTrack(RTCTrackEvent trackEvent)
diff --git a/Packages/StreamVideo/Runtime/Core/LowLevelClient/StreamVideoLowLevelClient.cs b/Packages/StreamVideo/Runtime/Core/LowLevelClient/StreamVideoLowLevelClient.cs
index 79e13826..47840ce5 100644
--- a/Packages/StreamVideo/Runtime/Core/LowLevelClient/StreamVideoLowLevelClient.cs
+++ b/Packages/StreamVideo/Runtime/Core/LowLevelClient/StreamVideoLowLevelClient.cs
@@ -143,8 +143,8 @@ public StreamVideoLowLevelClient(IWebsocketClient coordinatorWebSocket, IWebsock
}
//StreamTodo: move to factory
- var coordinatorReconnect = new ReconnectScheduler(_timeService, this, _networkMonitor);
- var sfuReconnect = new ReconnectScheduler(_timeService, this, _networkMonitor);
+ var coordinatorReconnect = new ReconnectScheduler(_timeService, this, _networkMonitor, shouldReconnect: () => true);
+ var sfuReconnect = new ReconnectScheduler(_timeService, this, _networkMonitor, shouldReconnect: () => RtcSession.ShouldSfuAttemptToReconnect());
//StreamTodo: move to factory
_coordinatorWS = new CoordinatorWebSocket(coordinatorWebSocket, coordinatorReconnect, authProvider: this,
@@ -189,7 +189,7 @@ public async Task ConnectUserAsync(string apiKey, string userId, string userToke
#endif
await _coordinatorWS.ConnectAsync(cancellationToken);
- await UpdateLocationHintAsync();
+ await UpdateLocationHintAsync(cancellationToken);
Connected?.Invoke();
}
@@ -222,12 +222,12 @@ public void Update()
RtcSession.Update();
}
- public async Task GetLocationHintAsync()
+ public async Task GetLocationHintAsync(CancellationToken cancellationToken = default)
{
if (_locationHint.IsNullOrEmpty())
{
_logs.Warning("No location hint - retrying to fetch it");
- await UpdateLocationHintAsync();
+ await UpdateLocationHintAsync(cancellationToken);
}
// StreamTodo: attempt to get location hint if not fetched already + perhaps there's an ongoing request and we can just wait
if (_locationHint.IsNullOrEmpty())
@@ -297,7 +297,7 @@ public void Dispose()
internal IInternalVideoClientApi InternalVideoClientApi { get; }
internal RtcSession RtcSession { get; }
- internal Task StartCallSessionAsync(StreamCall call) => RtcSession.StartAsync(call);
+ internal Task StartCallSessionAsync(StreamCall call, CancellationToken cancellationToken) => RtcSession.StartAsync(call, cancellationToken);
//internal Task StopCallSessionAsync() => RtcSession.StopAsync(); //StreamTodo: remove
@@ -366,10 +366,10 @@ private void OnSfuWebSocketConnectionStateChanged(ConnectionState previous, Conn
//StreamTodo: cancellation token
//StreamTodo: make few attempts + can be awaited by the JoinCallAsync + support reconnections
- private async Task UpdateLocationHintAsync()
+ private async Task UpdateLocationHintAsync(CancellationToken cancellationToken)
{
var headers = new List>>();
- await _httpClient.HeadAsync(LocationHintWebUri, headers);
+ await _httpClient.HeadAsync(LocationHintWebUri, headers, cancellationToken);
var locationHeader = headers.FirstOrDefault(_ => _.Key.ToLower() == LocationHintHeaderKey);
if (locationHeader.Key.IsNullOrEmpty() || !locationHeader.Value.Any())
diff --git a/Packages/StreamVideo/Runtime/Core/LowLevelClient/UnityWebRtcWrapperExtensions.cs b/Packages/StreamVideo/Runtime/Core/LowLevelClient/UnityWebRtcWrapperExtensions.cs
index 802ec06a..f20ce6f4 100644
--- a/Packages/StreamVideo/Runtime/Core/LowLevelClient/UnityWebRtcWrapperExtensions.cs
+++ b/Packages/StreamVideo/Runtime/Core/LowLevelClient/UnityWebRtcWrapperExtensions.cs
@@ -1,4 +1,5 @@
using System;
+using System.Threading;
using System.Threading.Tasks;
using Unity.WebRTC;
@@ -8,35 +9,36 @@ namespace StreamVideo.Core.LowLevelClient
internal static class UnityWebRtcWrapperExtensions
{
//StreamTodo: in webRTC example they also check for _peerConnection.SignalingState != RTCSignalingState.Stable
- public static Task CreateOfferAsync(this RTCPeerConnection peerConnection)
- => WaitForOperationAsync(peerConnection.CreateOffer(), r => r.Desc);
+ public static Task CreateOfferAsync(this RTCPeerConnection peerConnection, CancellationToken cancellationToken)
+ => WaitForOperationAsync(peerConnection.CreateOffer(), r => r.Desc, cancellationToken);
public static Task CreateOfferAsync(this RTCPeerConnection peerConnection,
- ref RTCOfferAnswerOptions options)
- => WaitForOperationAsync(peerConnection.CreateOffer(ref options), r => r.Desc);
+ ref RTCOfferAnswerOptions options, CancellationToken cancellationToken)
+ => WaitForOperationAsync(peerConnection.CreateOffer(ref options), r => r.Desc, cancellationToken);
- public static Task CreateAnswerAsync(this RTCPeerConnection peerConnection)
- => WaitForOperationAsync(peerConnection.CreateAnswer(), r => r.Desc);
+ public static Task CreateAnswerAsync(this RTCPeerConnection peerConnection, CancellationToken cancellationToken)
+ => WaitForOperationAsync(peerConnection.CreateAnswer(), r => r.Desc, cancellationToken);
public static Task SetLocalDescriptionAsync(this RTCPeerConnection peerConnection,
- ref RTCSessionDescription desc)
- => WaitForOperationAsync(peerConnection.SetLocalDescription(ref desc));
+ ref RTCSessionDescription desc, CancellationToken cancellationToken)
+ => WaitForOperationAsync(peerConnection.SetLocalDescription(ref desc), cancellationToken);
public static Task SetRemoteDescriptionAsync(this RTCPeerConnection peerConnection,
- ref RTCSessionDescription desc)
- => WaitForOperationAsync(peerConnection.SetRemoteDescription(ref desc));
+ ref RTCSessionDescription desc, CancellationToken cancellationToken)
+ => WaitForOperationAsync(peerConnection.SetRemoteDescription(ref desc), cancellationToken);
- public static Task GetStatsAsync(this RTCPeerConnection peerConnection)
- => WaitForOperationAsync(peerConnection.GetStats(), r => r.Value);
+ public static Task GetStatsAsync(this RTCPeerConnection peerConnection, CancellationToken cancellationToken)
+ => WaitForOperationAsync(peerConnection.GetStats(), r => r.Value, cancellationToken);
private static async Task WaitForOperationAsync(
- this TOperation asyncOperation, Func response)
+ this TOperation asyncOperation, Func response, CancellationToken cancellationToken)
where TOperation : AsyncOperationBase
{
// StreamTodo: refactor to use coroutine runner to reduce runtime allocations
while (!asyncOperation.IsDone)
{
- await Task.Delay(1);
+ cancellationToken.ThrowIfCancellationRequested();
+ await Task.Delay(1, cancellationToken);
}
if (asyncOperation.IsError)
@@ -48,13 +50,14 @@ private static async Task WaitForOperationAsync(
- this TOperation asyncOperation)
+ this TOperation asyncOperation, CancellationToken cancellationToken)
where TOperation : AsyncOperationBase
{
// StreamTodo: refactor to use coroutine runner to reduce runtime allocations
while (!asyncOperation.IsDone)
{
- await Task.Delay(1);
+ cancellationToken.ThrowIfCancellationRequested();
+ await Task.Delay(1, cancellationToken);
}
if (asyncOperation.IsError)
diff --git a/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/BasePersistentWebSocket.cs b/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/BasePersistentWebSocket.cs
index c49b1385..cbab6449 100644
--- a/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/BasePersistentWebSocket.cs
+++ b/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/BasePersistentWebSocket.cs
@@ -22,6 +22,8 @@ internal abstract class BasePersistentWebSocket : IPersistentWebSocket
public event Action Connected;
public event Action Disconnected;
+ public const int ConnectTimeoutMs = 1000;
+
public ConnectionState ConnectionState
{
get => _connectionState;
@@ -78,7 +80,9 @@ private void TryToReconnect()
Logs.Info($"{GetType()} TryToReconnect");
#endif
- ConnectAsync().LogIfFailed();
+ var cts = new CancellationTokenSource();
+ cts.CancelAfter(ConnectTimeoutMs);
+ ConnectAsync(cts.Token).LogIfFailed();
}
public Task ConnectAsync(CancellationToken cancellationToken = default)
@@ -105,7 +109,7 @@ public async Task DisconnectAsync(WebSocketCloseStatus closeStatus, string close
}
ConnectionState = ConnectionState.Disconnecting;
-
+
await OnDisconnectingAsync(closeMessage);
if (WebsocketClient == null)
@@ -224,6 +228,7 @@ protected void OnHealthCheckReceived()
protected virtual Task OnDisconnectingAsync(string closeMessage)
{
+ _reconnectScheduler.Reset();
return Task.CompletedTask;
}
@@ -338,6 +343,11 @@ private TEvent DeserializeEvent(string content, out TDto dto)
private void OnReconnectionScheduled()
{
+ if (!NextReconnectTime.HasValue)
+ {
+ throw new ArgumentNullException(nameof(NextReconnectTime));
+ }
+
ConnectionState = ConnectionState.WaitToReconnect;
var timeLeft = NextReconnectTime.Value - TimeService.Time;
diff --git a/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/CoordinatorWebSocket.cs b/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/CoordinatorWebSocket.cs
index e1e52685..1bec2312 100644
--- a/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/CoordinatorWebSocket.cs
+++ b/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/CoordinatorWebSocket.cs
@@ -73,7 +73,7 @@ protected override async Task ExecuteConnectAsync(CancellationToken cancellation
var uri = UriFactory.CreateCoordinatorConnectionUri();
//StreamTodo: Add cancel token support to WS
- await WebsocketClient.ConnectAsync(uri);
+ await WebsocketClient.ConnectAsync(uri, cancellationToken);
#if STREAM_DEBUG_ENABLED
Logs.Info("Coordinator connected! Let's send the connect message");
diff --git a/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/SfuWebSocket.cs b/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/SfuWebSocket.cs
index 9d22123e..2a907622 100644
--- a/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/SfuWebSocket.cs
+++ b/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/SfuWebSocket.cs
@@ -179,7 +179,7 @@ protected override async Task ExecuteConnectAsync(CancellationToken cancellation
var sfuUri = UriFactory.CreateSfuConnectionUri(_sfuUrl);
- await WebsocketClient.ConnectAsync(sfuUri);
+ await WebsocketClient.ConnectAsync(sfuUri, cancellationToken);
//StreamTodo: review when is the actual "connected state" - perhaps not the WS connection itself but receiving an appropriate event should set the flag
//e.g. are we able to send any data as soon as the connection is established?
@@ -340,7 +340,7 @@ private void OnHandleJoinResponse(JoinResponse joinResponse)
{
ConnectionState = ConnectionState.Connected;
- _connectUserTaskSource.SetResult(true);
+ _connectUserTaskSource.TrySetResult(true);
_connectUserTaskSource = null;
JoinResponse?.Invoke(joinResponse);
diff --git a/Packages/StreamVideo/Runtime/Core/Stats/IWebRtcStatsCollector.cs b/Packages/StreamVideo/Runtime/Core/Stats/IWebRtcStatsCollector.cs
index 29c2db70..ea7367a9 100644
--- a/Packages/StreamVideo/Runtime/Core/Stats/IWebRtcStatsCollector.cs
+++ b/Packages/StreamVideo/Runtime/Core/Stats/IWebRtcStatsCollector.cs
@@ -1,4 +1,5 @@
using System.Collections.Generic;
+using System.Threading;
using System.Threading.Tasks;
using StreamVideo.v1.Sfu.Models;
@@ -6,14 +7,14 @@ namespace StreamVideo.Core.Stats
{
internal interface IWebRtcStatsCollector
{
- Task GetPublisherStatsJsonAsync();
+ Task GetPublisherStatsJsonAsync(CancellationToken cancellationToken);
- Task GetSubscriberStatsJsonAsync();
+ Task GetSubscriberStatsJsonAsync(CancellationToken cancellationToken);
- Task GetRtcStatsJsonAsync();
+ Task GetRtcStatsJsonAsync(CancellationToken cancellationToken);
- Task> GetEncodeStatsAsync();
+ Task> GetEncodeStatsAsync(CancellationToken cancellationToken);
- Task> GetDecodeStatsAsync();
+ Task> GetDecodeStatsAsync(CancellationToken cancellationToken);
}
}
\ No newline at end of file
diff --git a/Packages/StreamVideo/Runtime/Core/Stats/UnityWebRtcStatsCollector.cs b/Packages/StreamVideo/Runtime/Core/Stats/UnityWebRtcStatsCollector.cs
index 54918142..0a47720f 100644
--- a/Packages/StreamVideo/Runtime/Core/Stats/UnityWebRtcStatsCollector.cs
+++ b/Packages/StreamVideo/Runtime/Core/Stats/UnityWebRtcStatsCollector.cs
@@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
+using System.Threading;
using System.Threading.Tasks;
using StreamVideo.Core.LowLevelClient;
using StreamVideo.Core.Trace;
@@ -34,23 +35,23 @@ private class StatSnapshot
public Dictionary Dict { get; set; }
}
- public async Task GetPublisherStatsJsonAsync()
+ public async Task GetPublisherStatsJsonAsync(CancellationToken cancellationToken)
{
- var report = await _rtcSession.Publisher.GetStatsReportAsync();
+ var report = await _rtcSession.Publisher.GetStatsReportAsync(cancellationToken);
return ConvertStatsToJson(report.Stats);
}
- public async Task GetSubscriberStatsJsonAsync()
+ public async Task GetSubscriberStatsJsonAsync(CancellationToken cancellationToken)
{
- var report = await _rtcSession.Subscriber.GetStatsReportAsync();
+ var report = await _rtcSession.Subscriber.GetStatsReportAsync(cancellationToken);
return ConvertStatsToJson(report.Stats);
}
- public async Task GetRtcStatsJsonAsync()
+ public async Task GetRtcStatsJsonAsync(CancellationToken cancellationToken)
{
// Get both publisher and subscriber stats
- var publisherReport = await _rtcSession.Publisher.GetStatsReportAsync();
- var subscriberReport = await _rtcSession.Subscriber.GetStatsReportAsync();
+ var publisherReport = await _rtcSession.Publisher.GetStatsReportAsync(cancellationToken);
+ var subscriberReport = await _rtcSession.Subscriber.GetStatsReportAsync(cancellationToken);
// Compute delta-compressed stats
var publisherDelta = ComputeDeltaCompression(_previousPublisherStats, publisherReport.Stats);
@@ -239,15 +240,15 @@ private Dictionary ComputeDeltaCompression(
return result;
}
- public async Task> GetEncodeStatsAsync()
+ public async Task> GetEncodeStatsAsync(CancellationToken cancellationToken)
{
- var report = await _rtcSession.Publisher.GetStatsReportAsync();
+ var report = await _rtcSession.Publisher.GetStatsReportAsync(cancellationToken);
return ComputeEncodeStats(report.Stats);
}
- public async Task> GetDecodeStatsAsync()
+ public async Task> GetDecodeStatsAsync(CancellationToken cancellationToken)
{
- var report = await _rtcSession.Subscriber.GetStatsReportAsync();
+ var report = await _rtcSession.Subscriber.GetStatsReportAsync(cancellationToken);
return ComputeDecodeStats(report.Stats);
}
diff --git a/Packages/StreamVideo/Runtime/Core/Stats/WebRtcStatsSender.cs b/Packages/StreamVideo/Runtime/Core/Stats/WebRtcStatsSender.cs
index 9adde97d..b991150e 100644
--- a/Packages/StreamVideo/Runtime/Core/Stats/WebRtcStatsSender.cs
+++ b/Packages/StreamVideo/Runtime/Core/Stats/WebRtcStatsSender.cs
@@ -1,4 +1,5 @@
using System;
+using System.Threading;
using System.Threading.Tasks;
using Google.Protobuf.Collections;
using StreamVideo.Core.LowLevelClient;
@@ -21,10 +22,15 @@ public void Update()
if (_timeService.Time > _lastTimeSent + SendInterval && _currentSendTask == null)
{
- _currentSendTask = CollectAndSend().ContinueWith(t =>
+ //StreamTODO: consider adding cancellation token support here -> leaving the call should cancel any ongoing operations
+ _currentSendTask = CollectAndSend(CancellationToken.None).ContinueWith(t =>
{
_currentSendTask = null;
- t.LogIfFailed();
+
+ if (_rtcSession.CallState == CallingState.Joining)
+ {
+ t.LogIfFailed();
+ }
});
_lastTimeSent = _timeService.Time;
}
@@ -34,7 +40,7 @@ public void Update()
/// Sends final stats immediately, flushing any remaining trace data.
/// Called when leaving a call to ensure all stats are captured.
///
- public async Task SendFinalStatsAsync()
+ public async Task SendFinalStatsAsync(CancellationToken cancellationToken)
{
if (_rtcSession.ActiveCall == null)
{
@@ -46,7 +52,7 @@ public async Task SendFinalStatsAsync()
await _currentSendTask;
}
- await CollectAndSend();
+ await CollectAndSend(cancellationToken);
}
internal WebRtcStatsSender(RtcSession rtcSession, IWebRtcStatsCollector webRtcStatsCollector,
@@ -74,18 +80,18 @@ internal WebRtcStatsSender(RtcSession rtcSession, IWebRtcStatsCollector webRtcSt
private float _lastTimeSent;
private Task _currentSendTask;
- private async Task CollectAndSend()
+ private async Task CollectAndSend(CancellationToken cancellationToken)
{
if (_rtcSession.ActiveCall == null)
{
return;
}
- var subscriberStatsJson = await _webRtcStatsCollector.GetSubscriberStatsJsonAsync();
- var publisherStatsJson = await _webRtcStatsCollector.GetPublisherStatsJsonAsync();
- var rtcStatsJson = await _webRtcStatsCollector.GetRtcStatsJsonAsync();
- var encodeStats = await _webRtcStatsCollector.GetEncodeStatsAsync();
- var decodeStats = await _webRtcStatsCollector.GetDecodeStatsAsync();
+ var subscriberStatsJson = await _webRtcStatsCollector.GetSubscriberStatsJsonAsync(cancellationToken);
+ var publisherStatsJson = await _webRtcStatsCollector.GetPublisherStatsJsonAsync(cancellationToken);
+ var rtcStatsJson = await _webRtcStatsCollector.GetRtcStatsJsonAsync(cancellationToken);
+ var encodeStats = await _webRtcStatsCollector.GetEncodeStatsAsync(cancellationToken);
+ var decodeStats = await _webRtcStatsCollector.GetDecodeStatsAsync(cancellationToken);
if (subscriberStatsJson == null || publisherStatsJson == null || rtcStatsJson == null)
{
@@ -130,7 +136,8 @@ private async Task CollectAndSend()
#endif
#pragma warning restore CS0162 // Re-enable unreachable code warning
- await _rtcSession.SendWebRtcStats(request);
+ cancellationToken.ThrowIfCancellationRequested();
+ await _rtcSession.SendWebRtcStats(request, cancellationToken);
}
}
}
\ No newline at end of file
diff --git a/Packages/StreamVideo/Runtime/Core/StreamVideoClient.cs b/Packages/StreamVideo/Runtime/Core/StreamVideoClient.cs
index 9c97c8c9..b4eb3a07 100644
--- a/Packages/StreamVideo/Runtime/Core/StreamVideoClient.cs
+++ b/Packages/StreamVideo/Runtime/Core/StreamVideoClient.cs
@@ -5,10 +5,12 @@
using System.Collections;
using System.Collections.Generic;
using System.Linq;
+using System.Threading;
using System.Threading.Tasks;
using StreamVideo.Core.QueryBuilders.Sort.Calls;
using StreamVideo.Core.Configs;
using StreamVideo.Core.DeviceManagers;
+using StreamVideo.Core.Exceptions;
using StreamVideo.Core.InternalDTO.Events;
using StreamVideo.Core.InternalDTO.Requests;
using StreamVideo.Core.LowLevelClient;
@@ -40,17 +42,19 @@ public enum DisconnectReason
/// SFU server disconnected
///
SfuWsDisconnected,
-
+
///
///
///
CoordinatorWsDisconnected,
+
+ VideoServerDisconnected,
}
-
+
public delegate void CallHandler(IStreamCall call);
public delegate void ConnectHandler(IStreamVideoUser localUser);
-
+
public delegate void DisconnectedHandler(DisconnectReason reason);
public class StreamVideoClient : IStreamVideoClient, IInternalStreamVideoClient
@@ -107,56 +111,68 @@ public static string CreateDeveloperAuthToken(string userId)
///
public static string SanitizeUserId(string userId) => StreamVideoLowLevelClient.SanitizeUserId(userId);
- //StreamTODO: this throws exception if the call doesn't exist. Check with other SDKs what is the expected behavior
- ///
- /// Will return null if the call doesn't exist
- ///
- public async Task GetCallAsync(StreamCallType callType, string callId)
+ public Task GetCallAsync(StreamCallType callType, string callId)
+ => GetCallAsync(callType, callId, CancellationToken.None);
+
+ public async Task GetCallAsync(StreamCallType callType, string callId,
+ CancellationToken cancellationToken)
{
//StreamTodo: validate input
var callData
= await InternalLowLevelClient.InternalVideoClientApi.GetCallAsync(callType, callId,
- new GetOrCreateCallRequestInternalDTO());
+ new GetOrCreateCallRequestInternalDTO(), cancellationToken);
return _cache.TryCreateOrUpdate(callData);
}
+ public Task GetOrCreateCallAsync(StreamCallType callType, string callId)
+ => GetOrCreateCallAsync(callType, callId, CancellationToken.None);
+
//StreamTodo: add more params (same as in JoinCallAsync) + add to interface
- public async Task GetOrCreateCallAsync(StreamCallType callType, string callId)
+ public async Task GetOrCreateCallAsync(StreamCallType callType, string callId, CancellationToken cancellationToken)
{
//StreamTodo: validate input
var callData
= await InternalLowLevelClient.InternalVideoClientApi.GetOrCreateCallAsync(callType, callId,
- new GetOrCreateCallRequestInternalDTO());
+ new GetOrCreateCallRequestInternalDTO(), cancellationToken);
//StreamTodo: what if null?
return _cache.TryCreateOrUpdate(callData);
}
+ public Task JoinCallAsync(StreamCallType callType, string callId, bool create, bool ring,
+ bool notify)
+ => JoinCallAsync(callType, callId, create, ring, notify, CancellationToken.None);
+
//StreamTodo: if ring and notify can't be both true then perhaps enum NotifyMode.Ring, NotifyMode.Notify?
//StreamTodo: add CreateCallOptions
public async Task JoinCallAsync(StreamCallType callType, string callId, bool create, bool ring,
- bool notify)
+ bool notify, CancellationToken cancellationToken)
{
- //StreamTodo: check if we're already in a call?
+ var callState = InternalLowLevelClient.RtcSession.CallState;
+ if (callState == CallingState.Joined || callState == CallingState.Joining)
+ {
+ throw new StreamCallInProgressException("Cannot join a call while another call is active or joining.");
+ }
IStreamCall call;
if (!create)
{
//StreamTodo: check android SDK if the flow is the same
- call = await GetCallAsync(callType, callId);
+ call = await GetCallAsync(callType, callId, cancellationToken);
if (call == null)
{
- throw new InvalidOperationException($"Call with id `{callId}` was not found");
+ throw new StreamCallNotFoundException(
+ $"Call with type: `{callType}`, and ID: `{callId}` was not found. Both type and ID are required to identify a call.");
}
}
else
{
- call = await GetOrCreateCallAsync(callType, callId);
+ call = await GetOrCreateCallAsync(callType, callId, cancellationToken);
}
// StreamTodo: check state if we don't have an active session already
- var locationHint = await InternalLowLevelClient.GetLocationHintAsync();
+ var locationHint = await InternalLowLevelClient.GetLocationHintAsync(cancellationToken);
//StreamTodo: move this logic to call.Join, this way user can create call object and join later on
@@ -169,8 +185,9 @@ public async Task JoinCallAsync(StreamCallType callType, string cal
Custom = null,
Members = null,
SettingsOverride = null,
- StartsAt = DateTimeOffset
- .Now, //StreamTODO: check this, if we're just joining another call perhaps we shouldn't set this?
+
+ //StreamTODO: check this, if we're just joining another call perhaps we shouldn't set this?
+ StartsAt = DateTimeOffset.Now,
Team = null
},
Location = locationHint,
@@ -184,7 +201,7 @@ var joinCallResponse
= await InternalLowLevelClient.InternalVideoClientApi.JoinCallAsync(callType, callId, joinCallRequest);
_cache.TryCreateOrUpdate(joinCallResponse);
- await InternalLowLevelClient.StartCallSessionAsync((StreamCall)call);
+ await InternalLowLevelClient.StartCallSessionAsync((StreamCall)call, cancellationToken);
CallStarted?.Invoke(call);
return call;
@@ -206,9 +223,12 @@ public void Dispose()
//StreamTodo: Consider removing this overload and exposing ConnectAsync() DisconnectAsync() only. The config would contain credentials (token or token provider), etc.
//Similar to Android SDK: https://getstream.io/video/docs/android/guides/client-auth/
- public async Task ConnectUserAsync(AuthCredentials credentials)
+ public Task ConnectUserAsync(AuthCredentials credentials)
+ => ConnectUserAsync(credentials, CancellationToken.None);
+
+ public async Task ConnectUserAsync(AuthCredentials credentials, CancellationToken cancellationToken)
{
- await InternalLowLevelClient.ConnectUserAsync(credentials);
+ await InternalLowLevelClient.ConnectUserAsync(credentials, cancellationToken);
#if STREAM_DEBUG_ENABLED
_logs.Warning("StreamVideoClient - CONNECTION - Trigger Connected event.");
@@ -458,9 +478,11 @@ private async Task LeaveCallAsync(IStreamCall call)
var callState = InternalLowLevelClient.RtcSession.CallState;
if (callState == CallingState.Leaving || callState == CallingState.Offline)
{
- _logs.Warning($"{nameof(LeaveCallAsync)}: Call is already leaving or offline, skipping leave operation.");
+ _logs.Warning(
+ $"{nameof(LeaveCallAsync)}: Call is already leaving or offline, skipping leave operation.");
return;
}
+
await InternalLowLevelClient.RtcSession.StopAsync("User is leaving the call");
}
catch (Exception e)
@@ -527,14 +549,21 @@ private void SubscribeTo(StreamVideoLowLevelClient lowLevelClient)
lowLevelClient.InternalCallSessionStartedEvent += OnInternalCallSessionStartedEvent;
lowLevelClient.InternalCallSessionParticipantJoinedEvent += OnInternalCallSessionParticipantJoinedEvent;
lowLevelClient.InternalCallSessionParticipantLeftEvent += OnInternalCallSessionParticipantLeftEvent;
- lowLevelClient.InternalCallSessionParticipantCountsUpdatedEvent += OnInternalCallSessionParticipantCountsUpdatedEvent;
+ lowLevelClient.InternalCallSessionParticipantCountsUpdatedEvent
+ += OnInternalCallSessionParticipantCountsUpdatedEvent;
lowLevelClient.InternalConnectionErrorEvent += OnInternalConnectionErrorEvent;
lowLevelClient.InternalCustomVideoEvent += OnInternalCustomVideoEvent;
lowLevelClient.Connected += InternalLowLevelClientOnConnected;
-
+
lowLevelClient.Disconnected += OnLowLevelClientDisconnected;
lowLevelClient.SfuDisconnected += OnLowLevelClientSfuDisconnected;
+ lowLevelClient.RtcSession.PeerConnectionDisconnectedDuringSession += OnRtcPeerConnectionDisconnectedDuringSession;
+ }
+
+ private void OnRtcPeerConnectionDisconnectedDuringSession()
+ {
+ Disconnected?.Invoke(DisconnectReason.VideoServerDisconnected);
}
private void UnsubscribeFrom(StreamVideoLowLevelClient lowLevelClient)
@@ -564,14 +593,17 @@ private void UnsubscribeFrom(StreamVideoLowLevelClient lowLevelClient)
lowLevelClient.InternalCallSessionStartedEvent -= OnInternalCallSessionStartedEvent;
lowLevelClient.InternalCallSessionParticipantJoinedEvent -= OnInternalCallSessionParticipantJoinedEvent;
lowLevelClient.InternalCallSessionParticipantLeftEvent -= OnInternalCallSessionParticipantLeftEvent;
- lowLevelClient.InternalCallSessionParticipantCountsUpdatedEvent -= OnInternalCallSessionParticipantCountsUpdatedEvent;
+ lowLevelClient.InternalCallSessionParticipantCountsUpdatedEvent
+ -= OnInternalCallSessionParticipantCountsUpdatedEvent;
lowLevelClient.InternalConnectionErrorEvent -= OnInternalConnectionErrorEvent;
lowLevelClient.InternalCustomVideoEvent -= OnInternalCustomVideoEvent;
lowLevelClient.Connected -= InternalLowLevelClientOnConnected;
-
+
lowLevelClient.Disconnected -= OnLowLevelClientDisconnected;
lowLevelClient.SfuDisconnected -= OnLowLevelClientSfuDisconnected;
+
+ lowLevelClient.RtcSession.PeerConnectionDisconnectedDuringSession -= OnRtcPeerConnectionDisconnectedDuringSession;
}
private void InternalLowLevelClientOnConnected()
@@ -767,7 +799,7 @@ private void OnInternalCallSessionStartedEvent(CallSessionStartedEventInternalDT
private void OnInternalCallSessionParticipantJoinedEvent(CallSessionParticipantJoinedEventInternalDTO eventData)
{
- if(ActiveCall == null || ActiveCall.Cid != eventData.CallCid)
+ if (ActiveCall == null || ActiveCall.Cid != eventData.CallCid)
{
return;
}
@@ -777,21 +809,22 @@ private void OnInternalCallSessionParticipantJoinedEvent(CallSessionParticipantJ
private void OnInternalCallSessionParticipantLeftEvent(CallSessionParticipantLeftEventInternalDTO eventData)
{
- if(ActiveCall == null || ActiveCall.Cid != eventData.CallCid)
+ if (ActiveCall == null || ActiveCall.Cid != eventData.CallCid)
{
return;
}
-
+
InternalLowLevelClient.RtcSession.ActiveCall.UpdateFromCoordinator(eventData, _cache);
}
- private void OnInternalCallSessionParticipantCountsUpdatedEvent(CallSessionParticipantCountsUpdatedEventInternalDTO eventData)
+ private void OnInternalCallSessionParticipantCountsUpdatedEvent(
+ CallSessionParticipantCountsUpdatedEventInternalDTO eventData)
{
- if(ActiveCall == null || ActiveCall.Cid != eventData.CallCid)
+ if (ActiveCall == null || ActiveCall.Cid != eventData.CallCid)
{
return;
}
-
+
InternalLowLevelClient.RtcSession.ActiveCall.UpdateFromCoordinator(eventData);
}
@@ -817,7 +850,7 @@ private void OnInternalCustomVideoEvent(CustomVideoEventInternalDTO eventData)
activeCall.NotifyCallEventReceived(callEvent);
}
-
+
private void OnLowLevelClientSfuDisconnected() => Disconnected?.Invoke(DisconnectReason.SfuWsDisconnected);
private void OnLowLevelClientDisconnected() => Disconnected?.Invoke(DisconnectReason.CoordinatorWsDisconnected);
diff --git a/Packages/StreamVideo/Runtime/Libs/Http/HttpClientAdapter.cs b/Packages/StreamVideo/Runtime/Libs/Http/HttpClientAdapter.cs
index 9d93382e..e04aaa27 100644
--- a/Packages/StreamVideo/Runtime/Libs/Http/HttpClientAdapter.cs
+++ b/Packages/StreamVideo/Runtime/Libs/Http/HttpClientAdapter.cs
@@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Net.Http;
using System.Net.Http.Headers;
+using System.Threading;
using System.Threading.Tasks;
namespace StreamVideo.Libs.Http
@@ -23,7 +24,7 @@ public void AddDefaultCustomHeader(string key, string value)
=> _httpClient.DefaultRequestHeaders.Add(key, value);
public async Task SendHttpRequestAsync(HttpMethodType methodType, Uri uri,
- object optionalRequestContent)
+ object optionalRequestContent, CancellationToken cancellationToken = default)
{
var httpContent = TryGetHttpContent(optionalRequestContent);
@@ -31,13 +32,13 @@ Task ExecuteAsync()
{
switch (methodType)
{
- case HttpMethodType.Get: return _httpClient.GetAsync(uri);
- case HttpMethodType.Post: return _httpClient.PostAsync(uri, httpContent);
- case HttpMethodType.Put: return _httpClient.PutAsync(uri, httpContent);
+ case HttpMethodType.Get: return _httpClient.GetAsync(uri, cancellationToken);
+ case HttpMethodType.Post: return _httpClient.PostAsync(uri, httpContent, cancellationToken);
+ case HttpMethodType.Put: return _httpClient.PutAsync(uri, httpContent, cancellationToken);
case HttpMethodType.Patch:
return _httpClient.SendAsync(new HttpRequestMessage(new HttpMethod("PATCH"), uri)
- { Content = httpContent });
- case HttpMethodType.Delete: return _httpClient.DeleteAsync(uri);
+ { Content = httpContent }, cancellationToken);
+ case HttpMethodType.Delete: return _httpClient.DeleteAsync(uri, cancellationToken);
default:
throw new ArgumentOutOfRangeException(nameof(methodType), methodType, null);
}
@@ -47,38 +48,38 @@ Task ExecuteAsync()
return await HttpResponse.CreateFromHttpResponseMessageAsync(httpResponseMessage);
}
- public async Task GetAsync(Uri uri)
+ public async Task GetAsync(Uri uri, CancellationToken cancellationToken = default)
{
- var response = await _httpClient.GetAsync(uri);
+ var response = await _httpClient.GetAsync(uri, cancellationToken);
return await HttpResponse.CreateFromHttpResponseMessageAsync(response);
}
- public async Task PostAsync(Uri uri, object content)
+ public async Task PostAsync(Uri uri, object content, CancellationToken cancellationToken = default)
{
var httpContent = TryGetHttpContent(content);
var response = await _httpClient.PostAsync(uri, httpContent);
return await HttpResponse.CreateFromHttpResponseMessageAsync(response);
}
- public async Task PutAsync(Uri uri, object content)
+ public async Task PutAsync(Uri uri, object content, CancellationToken cancellationToken = default)
{
var httpContent = TryGetHttpContent(content);
var response = await _httpClient.PutAsync(uri, httpContent);
return await HttpResponse.CreateFromHttpResponseMessageAsync(response);
}
- public async Task PatchAsync(Uri uri, object content)
+ public async Task PatchAsync(Uri uri, object content, CancellationToken cancellationToken = default)
{
var httpContent = TryGetHttpContent(content);
var response = await _httpClient.SendAsync(new HttpRequestMessage(new HttpMethod("PATCH"), uri)
- { Content = httpContent });
+ { Content = httpContent }, cancellationToken);
return await HttpResponse.CreateFromHttpResponseMessageAsync(response);
}
public async Task HeadAsync(Uri uri,
- ICollection>> resultHeaders = null)
+ ICollection>> resultHeaders = null, CancellationToken cancellationToken = default)
{
- var response = await _httpClient.SendAsync(new HttpRequestMessage(new HttpMethod("HEAD"), uri));
+ var response = await _httpClient.SendAsync(new HttpRequestMessage(new HttpMethod("HEAD"), uri), cancellationToken);
if (resultHeaders != null)
{
foreach (var header in response.Headers)
@@ -90,9 +91,9 @@ public async Task HeadAsync(Uri uri,
return await HttpResponse.CreateFromHttpResponseMessageAsync(response);
}
- public async Task DeleteAsync(Uri uri)
+ public async Task DeleteAsync(Uri uri, CancellationToken cancellationToken = default)
{
- var response = await _httpClient.DeleteAsync(uri);
+ var response = await _httpClient.DeleteAsync(uri, cancellationToken);
return await HttpResponse.CreateFromHttpResponseMessageAsync(response);
}
diff --git a/Packages/StreamVideo/Runtime/Libs/Http/IHttpClient.cs b/Packages/StreamVideo/Runtime/Libs/Http/IHttpClient.cs
index 2b41ed34..a0389b29 100644
--- a/Packages/StreamVideo/Runtime/Libs/Http/IHttpClient.cs
+++ b/Packages/StreamVideo/Runtime/Libs/Http/IHttpClient.cs
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
+using System.Threading;
using System.Threading.Tasks;
namespace StreamVideo.Libs.Http
@@ -13,18 +14,19 @@ public interface IHttpClient
void AddDefaultCustomHeader(string key, string value);
- Task GetAsync(Uri uri);
+ Task GetAsync(Uri uri, CancellationToken cancellationToken = default);
- Task PostAsync(Uri uri, object content);
+ Task PostAsync(Uri uri, object content, CancellationToken cancellationToken = default);
- Task PutAsync(Uri uri, object content);
+ Task PutAsync(Uri uri, object content, CancellationToken cancellationToken = default);
- Task PatchAsync(Uri uri, object content);
+ Task PatchAsync(Uri uri, object content, CancellationToken cancellationToken = default);
- Task DeleteAsync(Uri uri);
+ Task DeleteAsync(Uri uri, CancellationToken cancellationToken = default);
- Task SendHttpRequestAsync(HttpMethodType methodType, Uri uri, object optionalRequestContent);
+ Task SendHttpRequestAsync(HttpMethodType methodType, Uri uri, object optionalRequestContent, CancellationToken cancellationToken = default);
- Task HeadAsync(Uri uri, ICollection>> resultHeaders = null);
+ Task HeadAsync(Uri uri,
+ ICollection>> resultHeaders = null, CancellationToken cancellationToken = default);
}
}
\ No newline at end of file
diff --git a/Packages/StreamVideo/Runtime/Libs/Http/UnityWebRequestHttpClient.cs b/Packages/StreamVideo/Runtime/Libs/Http/UnityWebRequestHttpClient.cs
index f4100ad1..93f750bd 100644
--- a/Packages/StreamVideo/Runtime/Libs/Http/UnityWebRequestHttpClient.cs
+++ b/Packages/StreamVideo/Runtime/Libs/Http/UnityWebRequestHttpClient.cs
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Text;
+using System.Threading;
using System.Threading.Tasks;
using UnityEngine.Networking;
@@ -15,28 +16,31 @@ public class UnityWebRequestHttpClient : IHttpClient
public void AddDefaultCustomHeader(string key, string value) => _headers[key] = value;
- public Task GetAsync(Uri uri) => SendWebRequest(uri, UnityWebRequest.kHttpVerbGET);
+ public Task GetAsync(Uri uri, CancellationToken cancellationToken = default)
+ => SendWebRequest(uri, UnityWebRequest.kHttpVerbGET, cancellationToken: cancellationToken);
- public Task PostAsync(Uri uri, object content)
- => SendWebRequest(uri, UnityWebRequest.kHttpVerbPOST, content);
+ public Task PostAsync(Uri uri, object content, CancellationToken cancellationToken = default)
+ => SendWebRequest(uri, UnityWebRequest.kHttpVerbPOST, content, cancellationToken: cancellationToken);
- public Task PutAsync(Uri uri, object content)
- => SendWebRequest(uri, UnityWebRequest.kHttpVerbPUT, content);
+ public Task PutAsync(Uri uri, object content, CancellationToken cancellationToken = default)
+ => SendWebRequest(uri, UnityWebRequest.kHttpVerbPUT, content, cancellationToken: cancellationToken);
- public Task PatchAsync(Uri uri, object content) => SendWebRequest(uri, HttpPatchMethod, content);
+ public Task PatchAsync(Uri uri, object content, CancellationToken cancellationToken = default)
+ => SendWebRequest(uri, HttpPatchMethod, content, cancellationToken: cancellationToken);
- public Task DeleteAsync(Uri uri) => SendWebRequest(uri, UnityWebRequest.kHttpVerbDELETE);
+ public Task DeleteAsync(Uri uri, CancellationToken cancellationToken = default)
+ => SendWebRequest(uri, UnityWebRequest.kHttpVerbDELETE, cancellationToken: cancellationToken);
public Task SendHttpRequestAsync(HttpMethodType methodType, Uri uri,
- object optionalRequestContent)
+ object optionalRequestContent, CancellationToken cancellationToken = default)
{
var httpMethodKey = GetHttpMethodKey(methodType);
- return SendWebRequest(uri, httpMethodKey, optionalRequestContent);
+ return SendWebRequest(uri, httpMethodKey, optionalRequestContent, cancellationToken: cancellationToken);
}
public Task HeadAsync(Uri uri,
- ICollection>> resultHeaders)
- => SendWebRequest(uri, HttpHeadMethod, resultHeaders: resultHeaders);
+ ICollection>> resultHeaders, CancellationToken cancellationToken)
+ => SendWebRequest(uri, HttpHeadMethod, resultHeaders: resultHeaders, cancellationToken: cancellationToken);
private const string HttpHeadMethod = "HEAD";
private const string HttpPatchMethod = "PATCH";
@@ -60,7 +64,8 @@ private static string GetHttpMethodKey(HttpMethodType methodType)
//StreamTodo: add cancellationToken support that will trigger https://docs.unity3d.com/ScriptReference/Networking.UnityWebRequest.Abort.html
//StreamTodo: refactor to remove duplication
private async Task SendWebRequest(Uri uri, string httpMethod,
- object optionalContent = null, ICollection>> resultHeaders = null)
+ object optionalContent = null, ICollection>> resultHeaders = null,
+ CancellationToken cancellationToken = default)
{
if (optionalContent is FileWrapper fileWrapper)
{
@@ -84,6 +89,7 @@ private async Task SendWebRequest(Uri uri, string httpMethod,
while (!asyncOperation.isDone)
{
+ cancellationToken.ThrowIfCancellationRequested();
await Task.Yield();
}
@@ -131,6 +137,7 @@ private async Task SendWebRequest(Uri uri, string httpMethod,
while (!asyncOperation.isDone)
{
+ cancellationToken.ThrowIfCancellationRequested();
await Task.Yield();
}
diff --git a/Packages/StreamVideo/Runtime/Libs/Websockets/IWebsocketClient.cs b/Packages/StreamVideo/Runtime/Libs/Websockets/IWebsocketClient.cs
index 2a41f90a..8b9bd112 100644
--- a/Packages/StreamVideo/Runtime/Libs/Websockets/IWebsocketClient.cs
+++ b/Packages/StreamVideo/Runtime/Libs/Websockets/IWebsocketClient.cs
@@ -1,5 +1,6 @@
using System;
using System.Net.WebSockets;
+using System.Threading;
using System.Threading.Tasks;
namespace StreamVideo.Libs.Websockets
@@ -18,7 +19,7 @@ public interface IWebsocketClient : IDisposable
bool TryDequeueMessage(out byte[] message);
- Task ConnectAsync(Uri serverUri);
+ Task ConnectAsync(Uri serverUri, CancellationToken cancellationToken);
void Update();
diff --git a/Packages/StreamVideo/Runtime/Libs/Websockets/NativeWebSocketWrapper.cs b/Packages/StreamVideo/Runtime/Libs/Websockets/NativeWebSocketWrapper.cs
index 23422f7d..7878c407 100644
--- a/Packages/StreamVideo/Runtime/Libs/Websockets/NativeWebSocketWrapper.cs
+++ b/Packages/StreamVideo/Runtime/Libs/Websockets/NativeWebSocketWrapper.cs
@@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Net.WebSockets;
using System.Text;
+using System.Threading;
using System.Threading.Tasks;
using NativeWebSocket;
using StreamVideo.Libs.Utils;
@@ -37,7 +38,7 @@ public bool TryDequeueMessage(out byte[] message)
return message != null;
}
- public async Task ConnectAsync(Uri serverUri)
+ public async Task ConnectAsync(Uri serverUri, CancellationToken cancellationToken)
{
if (_webSocket != null)
{
diff --git a/Packages/StreamVideo/Runtime/Libs/Websockets/WebsocketClient.cs b/Packages/StreamVideo/Runtime/Libs/Websockets/WebsocketClient.cs
index 28a1bb5d..caa962af 100644
--- a/Packages/StreamVideo/Runtime/Libs/Websockets/WebsocketClient.cs
+++ b/Packages/StreamVideo/Runtime/Libs/Websockets/WebsocketClient.cs
@@ -38,7 +38,7 @@ public WebsocketClient(ILogs logs, Encoding encoding = default, bool isDebugMode
public bool TryDequeueMessage(out byte[] message) => _receiveQueue.TryDequeue(out message);
- public async Task ConnectAsync(Uri serverUri)
+ public async Task ConnectAsync(Uri serverUri, CancellationToken cancellationToken)
{
if (IsConnected || IsConnecting)
{
@@ -52,7 +52,8 @@ public async Task ConnectAsync(Uri serverUri)
{
await TryDisposeResourcesAsync(WebSocketCloseStatus.NormalClosure,
"Clean up resources before connecting");
- _connectionCts = new CancellationTokenSource();
+ cancellationToken.ThrowIfCancellationRequested();
+ _connectionCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
_internalClient = new ClientWebSocket();
_internalClient.Options.SetRequestHeader("User-Agent", "unity-video-sdk-ws-client");
@@ -78,7 +79,7 @@ await TryDisposeResourcesAsync(WebSocketCloseStatus.NormalClosure,
}
catch (Exception e)
{
- _logs.Exception(e);
+ LogExceptionIfDebugMode(e);
OnConnectionFailed();
throw;
}