diff --git a/Packages/StreamVideo/Runtime/Core/Exceptions/StreamCallInProgressException.cs b/Packages/StreamVideo/Runtime/Core/Exceptions/StreamCallInProgressException.cs new file mode 100644 index 00000000..f2cb0616 --- /dev/null +++ b/Packages/StreamVideo/Runtime/Core/Exceptions/StreamCallInProgressException.cs @@ -0,0 +1,15 @@ +using System; +using StreamVideo.Core.StatefulModels; + +namespace StreamVideo.Core.Exceptions +{ + /// + /// Exception thrown when trying to join a but another is already joined or currently joining + /// + public class StreamCallInProgressException : Exception + { + public StreamCallInProgressException(string message) : base(message) + { + } + } +} \ No newline at end of file diff --git a/Packages/StreamVideo/Runtime/Core/Exceptions/StreamCallInProgressException.cs.meta b/Packages/StreamVideo/Runtime/Core/Exceptions/StreamCallInProgressException.cs.meta new file mode 100644 index 00000000..b928a6ba --- /dev/null +++ b/Packages/StreamVideo/Runtime/Core/Exceptions/StreamCallInProgressException.cs.meta @@ -0,0 +1,3 @@ +fileFormatVersion: 2 +guid: 79ee770970f149d8b651612c496d6cb4 +timeCreated: 1764062143 \ No newline at end of file diff --git a/Packages/StreamVideo/Runtime/Core/Exceptions/StreamCallNotFoundException.cs b/Packages/StreamVideo/Runtime/Core/Exceptions/StreamCallNotFoundException.cs new file mode 100644 index 00000000..49b94402 --- /dev/null +++ b/Packages/StreamVideo/Runtime/Core/Exceptions/StreamCallNotFoundException.cs @@ -0,0 +1,16 @@ +using System; +using StreamVideo.Core.StatefulModels; + +namespace StreamVideo.Core.Exceptions +{ + /// + /// Exception thrown when a with provided ID is not found + /// + public class StreamCallNotFoundException : Exception + { + public StreamCallNotFoundException(string message) : base(message) + { + + } + } +} \ No newline at end of file diff --git a/Packages/StreamVideo/Runtime/Core/Exceptions/StreamCallNotFoundException.cs.meta b/Packages/StreamVideo/Runtime/Core/Exceptions/StreamCallNotFoundException.cs.meta new file mode 100644 index 00000000..6962d317 --- /dev/null +++ b/Packages/StreamVideo/Runtime/Core/Exceptions/StreamCallNotFoundException.cs.meta @@ -0,0 +1,3 @@ +fileFormatVersion: 2 +guid: 289d3871066b46ddb644e46fa1b9bc88 +timeCreated: 1764062138 \ No newline at end of file diff --git a/Packages/StreamVideo/Runtime/Core/IStreamVideoClient.cs b/Packages/StreamVideo/Runtime/Core/IStreamVideoClient.cs index 9dcaf148..acd052c7 100644 --- a/Packages/StreamVideo/Runtime/Core/IStreamVideoClient.cs +++ b/Packages/StreamVideo/Runtime/Core/IStreamVideoClient.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; using StreamVideo.Core.QueryBuilders.Sort.Calls; using StreamVideo.Core.DeviceManagers; @@ -67,6 +68,8 @@ public interface IStreamVideoClient : IStreamVideoClientEventsListener, IDisposa /// /// Credentials required to connect user: api_key, user_id, and user_token Task ConnectUserAsync(AuthCredentials credentials); + + Task ConnectUserAsync(AuthCredentials credentials, CancellationToken cancellationToken); /// /// Disconnect user from Stream server. @@ -76,10 +79,19 @@ public interface IStreamVideoClient : IStreamVideoClientEventsListener, IDisposa Task JoinCallAsync(StreamCallType callType, string callId, bool create, bool ring, bool notify); + Task JoinCallAsync(StreamCallType callType, string callId, bool create, bool ring, + bool notify, CancellationToken cancellationToken); + /// - /// Gets call information without joining it. Will return null if the call doesn't exist + /// Gets information without joining it. Will return null if the call doesn't exist /// Task GetCallAsync(StreamCallType callType, string callId); + + /// + /// Gets information without joining it. Will return null if the call doesn't exist + /// + Task GetCallAsync(StreamCallType callType, string callId, + CancellationToken cancellationToken); /// /// Get a call with a specified Type and ID. If such a call doesn't exist, it will be created. @@ -88,6 +100,14 @@ Task JoinCallAsync(StreamCallType callType, string callId, bool cre /// Call ID /// Call object of type: Task GetOrCreateCallAsync(StreamCallType callType, string callId); + + /// + /// Get a call with a specified Type and ID. If such a call doesn't exist, it will be created. + /// + /// Call type - this defines the permissions and other settings for the call. Read more in the Call Types Docs + /// Call ID + /// Call object of type: + Task GetOrCreateCallAsync(StreamCallType callType, string callId, CancellationToken cancellationToken); /// /// Query calls diff --git a/Packages/StreamVideo/Runtime/Core/InternalDTO/Sfu/GeneratedAPI.cs b/Packages/StreamVideo/Runtime/Core/InternalDTO/Sfu/GeneratedAPI.cs index cf3cbd58..e48610ed 100644 --- a/Packages/StreamVideo/Runtime/Core/InternalDTO/Sfu/GeneratedAPI.cs +++ b/Packages/StreamVideo/Runtime/Core/InternalDTO/Sfu/GeneratedAPI.cs @@ -1,3 +1,5 @@ +using System.Threading; + namespace StreamVideo.Core.Sfu { using Signal = StreamVideo.v1.Sfu.Signal; // Generated by protoc-gen-twirpcs. DO NOT EDIT! @@ -46,11 +48,12 @@ private static string parseJSONString(string jsonData, string key) { } private delegate Resp doParsing(byte[] data) where Resp : IMessage; - private static async Task DoRequest(HttpClient client, string address, Req req, doParsing parserFunc) where Req : IMessage where Resp : IMessage { + private static async Task DoRequest(HttpClient client, string address, Req req, doParsing parserFunc, CancellationToken cancellationToken = default) where Req : IMessage where Resp : IMessage { using (var content = new ByteArrayContent(req.ToByteArray())) { content.Headers.ContentType = CONTENT_TYPE_PROTOBUF; - using (HttpResponseMessage response = await client.PostAsync(address, content)) { + using (HttpResponseMessage response = await client.PostAsync(address, content, cancellationToken)) { var byteArr = await response.Content.ReadAsByteArrayAsync(); + cancellationToken.ThrowIfCancellationRequested(); if (!response.IsSuccessStatusCode) { string errorJSON = System.Text.Encoding.UTF8.GetString(byteArr, 0, byteArr.Length); throw createException(errorJSON); @@ -61,44 +64,44 @@ private static async Task DoRequest(HttpClient client, string a } // SetPublisher sends the WebRTC offer for the peer connection used to publish A/V - public static async Task SetPublisher(HttpClient client, Signal.SetPublisherRequest req) { - return await DoRequest(client, "/twirp/stream.video.sfu.signal.SignalServer/SetPublisher", req, Signal.SetPublisherResponse.Parser.ParseFrom); + public static async Task SetPublisher(HttpClient client, Signal.SetPublisherRequest req, CancellationToken cancellationToken = default) { + return await DoRequest(client, "/twirp/stream.video.sfu.signal.SignalServer/SetPublisher", req, Signal.SetPublisherResponse.Parser.ParseFrom, cancellationToken); } // answer is sent by the client to the SFU after receiving a subscriber_offer. - public static async Task SendAnswer(HttpClient client, Signal.SendAnswerRequest req) { - return await DoRequest(client, "/twirp/stream.video.sfu.signal.SignalServer/SendAnswer", req, Signal.SendAnswerResponse.Parser.ParseFrom); + public static async Task SendAnswer(HttpClient client, Signal.SendAnswerRequest req, CancellationToken cancellationToken = default) { + return await DoRequest(client, "/twirp/stream.video.sfu.signal.SignalServer/SendAnswer", req, Signal.SendAnswerResponse.Parser.ParseFrom, cancellationToken); } // SendICECandidate sends an ICE candidate to the client - public static async Task IceTrickle(HttpClient client, StreamVideo.v1.Sfu.Models.ICETrickle req) { - return await DoRequest(client, "/twirp/stream.video.sfu.signal.SignalServer/IceTrickle", req, StreamVideo.v1.Sfu.Signal.ICETrickleResponse.Parser.ParseFrom); + public static async Task IceTrickle(HttpClient client, StreamVideo.v1.Sfu.Models.ICETrickle req, CancellationToken cancellationToken = default) { + return await DoRequest(client, "/twirp/stream.video.sfu.signal.SignalServer/IceTrickle", req, StreamVideo.v1.Sfu.Signal.ICETrickleResponse.Parser.ParseFrom, cancellationToken); } // UpdateSubscribers is used to notify the SFU about the list of video subscriptions // TODO: sync subscriptions based on this + update tracks using the dimension info sent by the user - public static async Task UpdateSubscriptions(HttpClient client, Signal.UpdateSubscriptionsRequest req) { - return await DoRequest(client, "/twirp/stream.video.sfu.signal.SignalServer/UpdateSubscriptions", req, Signal.UpdateSubscriptionsResponse.Parser.ParseFrom); + public static async Task UpdateSubscriptions(HttpClient client, Signal.UpdateSubscriptionsRequest req, CancellationToken cancellationToken = default) { + return await DoRequest(client, "/twirp/stream.video.sfu.signal.SignalServer/UpdateSubscriptions", req, Signal.UpdateSubscriptionsResponse.Parser.ParseFrom, cancellationToken); } - public static async Task UpdateMuteStates(HttpClient client, Signal.UpdateMuteStatesRequest req) { - return await DoRequest(client, "/twirp/stream.video.sfu.signal.SignalServer/UpdateMuteStates", req, Signal.UpdateMuteStatesResponse.Parser.ParseFrom); + public static async Task UpdateMuteStates(HttpClient client, Signal.UpdateMuteStatesRequest req, CancellationToken cancellationToken = default) { + return await DoRequest(client, "/twirp/stream.video.sfu.signal.SignalServer/UpdateMuteStates", req, Signal.UpdateMuteStatesResponse.Parser.ParseFrom, cancellationToken); } - public static async Task IceRestart(HttpClient client, Signal.ICERestartRequest req) { - return await DoRequest(client, "/twirp/stream.video.sfu.signal.SignalServer/IceRestart", req, Signal.ICERestartResponse.Parser.ParseFrom); + public static async Task IceRestart(HttpClient client, Signal.ICERestartRequest req, CancellationToken cancellationToken = default) { + return await DoRequest(client, "/twirp/stream.video.sfu.signal.SignalServer/IceRestart", req, Signal.ICERestartResponse.Parser.ParseFrom, cancellationToken); } - public static async Task SendStats(HttpClient client, Signal.SendStatsRequest req) { - return await DoRequest(client, "/twirp/stream.video.sfu.signal.SignalServer/SendStats", req, Signal.SendStatsResponse.Parser.ParseFrom); + public static async Task SendStats(HttpClient client, Signal.SendStatsRequest req, CancellationToken cancellationToken = default) { + return await DoRequest(client, "/twirp/stream.video.sfu.signal.SignalServer/SendStats", req, Signal.SendStatsResponse.Parser.ParseFrom, cancellationToken); } - public static async Task StartNoiseCancellation(HttpClient client, Signal.StartNoiseCancellationRequest req) { - return await DoRequest(client, "/twirp/stream.video.sfu.signal.SignalServer/StartNoiseCancellation", req, Signal.StartNoiseCancellationResponse.Parser.ParseFrom); + public static async Task StartNoiseCancellation(HttpClient client, Signal.StartNoiseCancellationRequest req, CancellationToken cancellationToken = default) { + return await DoRequest(client, "/twirp/stream.video.sfu.signal.SignalServer/StartNoiseCancellation", req, Signal.StartNoiseCancellationResponse.Parser.ParseFrom, cancellationToken); } - public static async Task StopNoiseCancellation(HttpClient client, Signal.StopNoiseCancellationRequest req) { - return await DoRequest(client, "/twirp/stream.video.sfu.signal.SignalServer/StopNoiseCancellation", req, Signal.StopNoiseCancellationResponse.Parser.ParseFrom); + public static async Task StopNoiseCancellation(HttpClient client, Signal.StopNoiseCancellationRequest req, CancellationToken cancellationToken = default) { + return await DoRequest(client, "/twirp/stream.video.sfu.signal.SignalServer/StopNoiseCancellation", req, Signal.StopNoiseCancellationResponse.Parser.ParseFrom, cancellationToken); } } } diff --git a/Packages/StreamVideo/Runtime/Core/LowLevelClient/API/Internal/IInternallVideoClientApi.cs b/Packages/StreamVideo/Runtime/Core/LowLevelClient/API/Internal/IInternallVideoClientApi.cs index 12f22503..659b8943 100644 --- a/Packages/StreamVideo/Runtime/Core/LowLevelClient/API/Internal/IInternallVideoClientApi.cs +++ b/Packages/StreamVideo/Runtime/Core/LowLevelClient/API/Internal/IInternallVideoClientApi.cs @@ -1,3 +1,4 @@ +using System.Threading; using System.Threading.Tasks; using StreamVideo.Core.InternalDTO.Models; using StreamVideo.Core.InternalDTO.Requests; @@ -8,13 +9,13 @@ namespace StreamVideo.Core.LowLevelClient.API.Internal internal interface IInternalVideoClientApi { Task GetCallAsync(StreamCallType callType, string callId, - GetOrCreateCallRequestInternalDTO getCallRequest); + GetOrCreateCallRequestInternalDTO getCallRequest, CancellationToken cancellationToken); Task UpdateCallAsync(StreamCallType callType, string callId, UpdateCallRequestInternalDTO updateCallRequest); Task GetOrCreateCallAsync(StreamCallType callType, string callId, - GetOrCreateCallRequestInternalDTO getOrCreateCallRequest); + GetOrCreateCallRequestInternalDTO getOrCreateCallRequest, CancellationToken cancellationToken); Task AcceptCallAsync(StreamCallType callType, string callId); diff --git a/Packages/StreamVideo/Runtime/Core/LowLevelClient/API/Internal/InternalApiClientBase.cs b/Packages/StreamVideo/Runtime/Core/LowLevelClient/API/Internal/InternalApiClientBase.cs index db33b132..fbb83bd7 100644 --- a/Packages/StreamVideo/Runtime/Core/LowLevelClient/API/Internal/InternalApiClientBase.cs +++ b/Packages/StreamVideo/Runtime/Core/LowLevelClient/API/Internal/InternalApiClientBase.cs @@ -1,5 +1,6 @@ using System; using System.Text; +using System.Threading; using System.Threading.Tasks; using StreamVideo.Core.Exceptions; using StreamVideo.Core.InternalDTO.Models; @@ -25,14 +26,18 @@ protected InternalApiClientBase(IHttpClient httpClient, ISerializer serializer, _lowLevelClient = lowLevelClient ?? throw new ArgumentNullException(nameof(lowLevelClient)); } - protected Task Get(string endpoint, TPayload payload) - => HttpRequest(HttpMethodType.Get, endpoint, payload); + //StreamTODO: add cancellation token support to all + + protected Task Get(string endpoint, TPayload payload, + CancellationToken cancellationToken) + => HttpRequest(HttpMethodType.Get, endpoint, payload, cancellationToken: cancellationToken); protected Task Get(string endpoint, QueryParameters parameters = null) => HttpRequest(HttpMethodType.Get, endpoint, queryParameters: parameters); - protected Task Post(string endpoint, TRequest request = default) - => HttpRequest(HttpMethodType.Post, endpoint, request); + protected Task Post(string endpoint, TRequest request = default, + CancellationToken cancellationToken = default) + => HttpRequest(HttpMethodType.Post, endpoint, request, cancellationToken: cancellationToken); protected Task Post(string endpoint, object request = null) => HttpRequest(HttpMethodType.Post, endpoint, request); @@ -74,7 +79,7 @@ private object TrySerializeRequestBodyContent(object content, out string seriali } private async Task HttpRequest(HttpMethodType httpMethod, string endpoint, - object requestBody = default, QueryParameters queryParameters = null) + object requestBody = default, QueryParameters queryParameters = null, CancellationToken cancellationToken = default) { // //StreamTodo: perhaps remove this requirement, sometimes we send empty body without any properties // if (requestBody == null && IsRequestBodyRequiredByHttpMethod(httpMethod)) @@ -95,7 +100,7 @@ private async Task HttpRequest(HttpMethodType httpMethod, LogFutureRequestIfDebug(uri, endpoint, httpMethod, logContent); - var httpResponse = await _httpClient.SendHttpRequestAsync(httpMethod, uri, httpContent); + var httpResponse = await _httpClient.SendHttpRequestAsync(httpMethod, uri, httpContent, cancellationToken); var responseContent = httpResponse.Result; if (!httpResponse.IsSuccessStatusCode) @@ -121,6 +126,7 @@ private async Task HttpRequest(HttpMethodType httpMethod, { _logs.Info($"Http request failed due to expired token, connection id: {_lowLevelClient.ConnectionId}"); await _lowLevelClient.DisconnectAsync(); + cancellationToken.ThrowIfCancellationRequested(); } //StreamTodo: Refactor Token refresh logic. This relies on the fact that connecting fetches fresh token. But we can probably replace the token without breaking the connection @@ -135,7 +141,7 @@ private async Task HttpRequest(HttpMethodType httpMethod, while (_lowLevelClient.ConnectionState != ConnectionState.Connected) { i++; - await Task.Delay(1); + await Task.Delay(1, cancellationToken); if (i > maxMsToWait) { @@ -152,7 +158,7 @@ private async Task HttpRequest(HttpMethodType httpMethod, // Recreate the uri to include new connection id uri = _requestUriFactory.CreateEndpointUri(endpoint, queryParameters); - httpResponse = await _httpClient.SendHttpRequestAsync(httpMethod, uri, httpContent); + httpResponse = await _httpClient.SendHttpRequestAsync(httpMethod, uri, httpContent, cancellationToken); responseContent = httpResponse.Result; } diff --git a/Packages/StreamVideo/Runtime/Core/LowLevelClient/API/Internal/InternalVideoClientApi.cs b/Packages/StreamVideo/Runtime/Core/LowLevelClient/API/Internal/InternalVideoClientApi.cs index f8cb69e9..39e9458e 100644 --- a/Packages/StreamVideo/Runtime/Core/LowLevelClient/API/Internal/InternalVideoClientApi.cs +++ b/Packages/StreamVideo/Runtime/Core/LowLevelClient/API/Internal/InternalVideoClientApi.cs @@ -1,4 +1,5 @@ -using System.Threading.Tasks; +using System.Threading; +using System.Threading.Tasks; using StreamVideo.Core.InternalDTO.Models; using StreamVideo.Core.InternalDTO.Requests; using StreamVideo.Core.InternalDTO.Responses; @@ -20,17 +21,17 @@ public InternalVideoClientApi(IHttpClient httpClient, ISerializer serializer, IL } public Task GetCallAsync(StreamCallType callType, string callId, - GetOrCreateCallRequestInternalDTO getCallRequest) - => Get($"/call/{callType}/{callId}", getCallRequest); + GetOrCreateCallRequestInternalDTO getCallRequest, CancellationToken cancellationToken) + => Get($"/call/{callType}/{callId}", getCallRequest, cancellationToken); public Task UpdateCallAsync(StreamCallType callType, string callId, UpdateCallRequestInternalDTO updateCallRequest) => Patch($"/call/{callType}/{callId}", updateCallRequest); public Task GetOrCreateCallAsync(StreamCallType callType, string callId, - GetOrCreateCallRequestInternalDTO getOrCreateCallRequest) + GetOrCreateCallRequestInternalDTO getOrCreateCallRequest, CancellationToken cancellationToken) => Post($"/call/{callType}/{callId}", - getOrCreateCallRequest); + getOrCreateCallRequest, cancellationToken); public Task AcceptCallAsync(StreamCallType callType, string callId) => Post($"/call/{callType}/{callId}/accept"); diff --git a/Packages/StreamVideo/Runtime/Core/LowLevelClient/IStreamVideoLowLevelClient.cs b/Packages/StreamVideo/Runtime/Core/LowLevelClient/IStreamVideoLowLevelClient.cs index 197d096f..7950b98c 100644 --- a/Packages/StreamVideo/Runtime/Core/LowLevelClient/IStreamVideoLowLevelClient.cs +++ b/Packages/StreamVideo/Runtime/Core/LowLevelClient/IStreamVideoLowLevelClient.cs @@ -57,6 +57,6 @@ Task ConnectUserAsync(string apiKey, string userId, string userToken, Task ConnectUserAsync(string apiKey, string userId, ITokenProvider tokenProvider, CancellationToken cancellationToken = default); - Task GetLocationHintAsync(); + Task GetLocationHintAsync(CancellationToken cancellationToken); } } \ No newline at end of file diff --git a/Packages/StreamVideo/Runtime/Core/LowLevelClient/ReconnectScheduler.cs b/Packages/StreamVideo/Runtime/Core/LowLevelClient/ReconnectScheduler.cs index f0be759d..a812779c 100644 --- a/Packages/StreamVideo/Runtime/Core/LowLevelClient/ReconnectScheduler.cs +++ b/Packages/StreamVideo/Runtime/Core/LowLevelClient/ReconnectScheduler.cs @@ -8,6 +8,8 @@ internal interface IReconnectScheduler { double? NextReconnectTime { get; } event Action ReconnectionScheduled; + + void Reset(); } /// /// Schedules next reconnection time based on the past attempts and network availability @@ -43,8 +45,9 @@ private set } public ReconnectScheduler(ITimeService timeService, IStreamVideoLowLevelClient lowLevelClient, - INetworkMonitor networkMonitor) + INetworkMonitor networkMonitor, Func shouldReconnect) { + _shouldReconnect = shouldReconnect ?? throw new ArgumentNullException(nameof(shouldReconnect)); _client = lowLevelClient ?? throw new ArgumentNullException(nameof(lowLevelClient)); _timeService = timeService ?? throw new ArgumentNullException(nameof(timeService)); _networkMonitor = networkMonitor ?? throw new ArgumentNullException(nameof(networkMonitor)); @@ -98,6 +101,12 @@ void ThrowIfLessOrEqualToZero(float value, string name) } } + public void Reset() + { + NextReconnectTime = default; + _reconnectAttempts = 0; + } + public void Stop() { NextReconnectTime = float.MaxValue; @@ -112,6 +121,7 @@ public void Stop() private int _reconnectAttempts; private bool _isStopped; private double? _nextReconnectTime; + private Func _shouldReconnect; private void TryScheduleNextReconnectTime() { @@ -120,7 +130,7 @@ private void TryScheduleNextReconnectTime() return; } - if (_isStopped || ReconnectStrategy == ReconnectStrategy.Never) + if (_isStopped || ReconnectStrategy == ReconnectStrategy.Never || !_shouldReconnect()) { return; } @@ -151,6 +161,7 @@ private void TryScheduleNextReconnectTime() } NextReconnectTime = GetNextReconnectTime(); + _reconnectAttempts++; } private void OnConnectionStateChanged(ConnectionState previous, ConnectionState current) diff --git a/Packages/StreamVideo/Runtime/Core/LowLevelClient/RtcSession.cs b/Packages/StreamVideo/Runtime/Core/LowLevelClient/RtcSession.cs index 276d8451..68509a7e 100644 --- a/Packages/StreamVideo/Runtime/Core/LowLevelClient/RtcSession.cs +++ b/Packages/StreamVideo/Runtime/Core/LowLevelClient/RtcSession.cs @@ -7,6 +7,7 @@ using System.Net.Http; using System.Net.WebSockets; using System.Text.RegularExpressions; +using System.Threading; using System.Threading.Tasks; using StreamVideo.v1.Sfu.Events; using StreamVideo.v1.Sfu.Models; @@ -84,6 +85,7 @@ internal sealed class RtcSession : IMediaInputProvider, IDisposable public event Action PublisherAudioTrackChanged; public event Action PublisherVideoTrackChanged; + public event Action PeerConnectionDisconnectedDuringSession; public bool PublisherAudioTrackIsEnabled { @@ -212,6 +214,16 @@ public Camera VideoSceneInput } #endregion + + public bool ShouldSfuAttemptToReconnect() + { + if (CallState != CallingState.Joined && CallState != CallingState.Joining) + { + return false; + } + + return !GetCurrentCancellationTokenOrDefault().IsCancellationRequested; + } public string SessionId { get; private set; } = "(empty)"; @@ -270,10 +282,11 @@ public void Update() TryExecuteSubscribeToTracks(); } - public async Task SendWebRtcStats(SendStatsRequest request) + public async Task SendWebRtcStats(SendStatsRequest request, CancellationToken cancellationToken) { var response = await RpcCallAsync(request, GeneratedAPI.SendStats, - nameof(GeneratedAPI.SendStats), response => response.Error, postLog: LogWebRTCStats); + nameof(GeneratedAPI.SendStats), cancellationToken, response => response.Error, + postLog: LogWebRTCStats); if (ActiveCall == null) { @@ -298,7 +311,40 @@ public async Task SendWebRtcStats(SendStatsRequest request) //StreamTodo: solve this dependency better public void SetCache(ICache cache) => _cache = cache; - public async Task StartAsync(StreamCall call) + private void ValidateCallCredentialsOrThrow(IStreamCall call) + { + if (call.Credentials == null) + { + throw new ArgumentNullException(nameof(call.Credentials)); + } + + if (call.Credentials.Server == null) + { + throw new ArgumentNullException(nameof(call.Credentials.Server)); + } + + if (string.IsNullOrEmpty(call.Credentials.Server.Url)) + { + throw new ArgumentNullException(nameof(call.Credentials.Server.Url)); + } + + if (string.IsNullOrEmpty(call.Credentials.Token)) + { + throw new ArgumentNullException(nameof(call.Credentials.Token)); + } + + if (call.Credentials.IceServers == null) + { + throw new ArgumentNullException(nameof(call.Credentials.IceServers)); + } + + if (call.Credentials.IceServers.Count == 0) + { + throw new ArgumentException("At least one ICE server must be provided in call credentials."); + } + } + + public async Task StartAsync(StreamCall call, CancellationToken cancellationToken = default) { if (ActiveCall != null) { @@ -306,18 +352,45 @@ public async Task StartAsync(StreamCall call) $"Cannot start new session until previous call is active. Active call: {ActiveCall}"); } + if (call == null) + { + throw new ArgumentNullException(nameof(call)); + } + try { + _logs.Info($"Start joining a call: type={call.Type}, id={call.Id}"); + //StreamTodo: perhaps not necessary here ClearSession(); + if (_joinCallCts != null) + { + _logs.ErrorIfDebug("Previous join call CTS was not cleaned up properly. Cancelling it now."); + _joinCallCts.Cancel(); + _joinCallCts.Dispose(); + _joinCallCts = null; + } + + if (_activeCallCts != null) + { + _logs.ErrorIfDebug("Previous active call CTS was not cleaned up properly. Cancelling it now."); + _activeCallCts.Cancel(); + _activeCallCts.Dispose(); + _activeCallCts = null; + } + + _joinCallCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + SubscribeToSfuEvents(); - ActiveCall = call ?? throw new ArgumentNullException(nameof(call)); + ActiveCall = call; _httpClient = _httpClientFactory(ActiveCall); CallState = CallingState.Joining; + ValidateCallCredentialsOrThrow(ActiveCall); + var sfuUrl = call.Credentials.Server.Url; var sfuToken = call.Credentials.Token; var iceServers = call.Credentials.IceServers; @@ -339,16 +412,30 @@ public async Task StartAsync(StreamCall call) #endif // We don't set initial offer as local. Later on we set generated answer as a local - var offer = await Subscriber.CreateOfferAsync(); + var offer = await Subscriber.CreateOfferAsync(_joinCallCts.Token); + + if (string.IsNullOrEmpty(offer.sdp)) + { + throw new ArgumentException("Generated offer SDP is null or empty"); + } _sfuWebSocket.SetSessionData(SessionId, offer.sdp, sfuUrl, sfuToken); - await _sfuWebSocket.ConnectAsync(); + await _sfuWebSocket.ConnectAsync(cancellationToken); + // Wait for call to be joined with timeout + const int joinTimeoutSeconds = 30; + var joinStartTime = _timeService.Time; while (CallState != CallingState.Joined) { - //StreamTodo: implement a timeout if something goes wrong - //StreamTodo: implement cancellation token - await Task.Delay(1); + await Task.Delay(1, cancellationToken); + cancellationToken.ThrowIfCancellationRequested(); + + var elapsedTime = _timeService.Time - joinStartTime; + if (elapsedTime > joinTimeoutSeconds) + { + throw new TimeoutException( + $"Failed to join call within {joinTimeoutSeconds} seconds. Current state: {CallState}"); + } } // Wait for SFU connected to receive track prefix @@ -357,7 +444,7 @@ public async Task StartAsync(StreamCall call) CreatePublisher(iceServers); } - await SubscribeToTracksAsync(); + await SubscribeToTracksAsync(cancellationToken); if (UseNativeAudioBindings) { @@ -370,36 +457,71 @@ public async Task StartAsync(StreamCall call) //StreamTodo: validate when this state should set CallState = CallingState.Joined; + _activeCallCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + + _logs.Info($"Joined call: type={call.Type}, id={call.Id}"); + #if STREAM_DEBUG_ENABLED _videoAudioSyncBenchmark?.Init(call); #endif } - catch + catch(OperationCanceledException) + { + ClearSession(); + throw; + } + catch(Exception e) { + _logs.Exception(e); ClearSession(); throw; } + finally + { + if (_joinCallCts != null) + { + _joinCallCts.Dispose(); + _joinCallCts = null; + } + } } public async Task StopAsync(string reason = "") { + if (UseNativeAudioBindings) + { +#if STREAM_NATIVE_AUDIO + WebRTC.StopAudioPlayback(); +#endif + } + if (CallState == CallingState.Leaving || CallState == CallingState.Offline) { + //StreamTODO: should this return a task of the ongoing stop? return; } + if (CallState != CallingState.Joined && CallState != CallingState.Joining) + { + throw new InvalidOperationException( + "Tried to leave call that is not joined or joining. Current state: " + CallState); + } + CallState = CallingState.Leaving; + if (_joinCallCts != null) + { + _joinCallCts.Cancel(); + } - if (UseNativeAudioBindings) + if (_activeCallCts != null) { -#if STREAM_NATIVE_AUDIO - WebRTC.StopAudioPlayback(); -#endif + _activeCallCts.Cancel(); } if (ActiveCall != null) { + _logs.Info("Leaving call..."); try { // Trace leave call before leaving the call. Otherwise, stats are not send because SFU WS disconnects @@ -407,9 +529,11 @@ public async Task StopAsync(string reason = "") if (_statsSender != null) // This was null in tests { + var sendStatsCancellationToken = new CancellationTokenSource(); + sendStatsCancellationToken.CancelAfter(400); using (new TimeLogScope("Sending final stats on leave", _logs.Info)) { - await _statsSender.SendFinalStatsAsync(); + await _statsSender.SendFinalStatsAsync(sendStatsCancellationToken.Token); } } } @@ -417,24 +541,16 @@ public async Task StopAsync(string reason = "") { _logs.Warning($"Failed to send final stats on leave: {e.Message}"); } - -#if STREAM_DEBUG_ENABLED - if (_sfuWebSocket.SendQueueCount > 0) - { - _logs.Error( - $"Waited for 300+ ms for SFU messages to be sent. Remaining: {_sfuWebSocket.SendQueueCount}"); - } -#endif } ClearSession(); - //StreamTodo: check with js definition of "offline" using (new TimeLogScope("Sending leave call request & disconnect", _logs.Info)) { await _sfuWebSocket.DisconnectAsync(WebSocketCloseStatus.NormalClosure, reason); } + //StreamTodo: check with js definition of "offline" CallState = CallingState.Offline; #if STREAM_DEBUG_ENABLED @@ -561,6 +677,9 @@ private readonly Dictionary _videoResolutionByParticipa private MicrophoneDeviceInfo _activeAudioRecordingDevice; + private CancellationTokenSource _joinCallCts; + private CancellationTokenSource _activeCallCts; + private void ClearSession() { UnsubscribeFromSfuEvents(); @@ -578,10 +697,32 @@ private void ClearSession() CallState = CallingState.Unknown; _httpClient = null; + if (_joinCallCts != null) + { + _joinCallCts.Dispose(); + _joinCallCts = null; + } + + if (_activeCallCts != null) + { + _activeCallCts.Dispose(); + _activeCallCts = null; + } + _trackSubscriptionRequested = false; _trackSubscriptionRequestInProgress = false; } + private CancellationToken GetCurrentCancellationTokenOrDefault() + { + if (_activeCallCts != null) + { + return _activeCallCts.Token; + } + + return _joinCallCts?.Token ?? default; + } + //StreamTodo: request track subscriptions when SFU got changed. Android comment for setVideoSubscriptions: /* * - it sends the resolutions we're displaying the video at so the SFU can decide which track to send @@ -615,7 +756,16 @@ private void TryExecuteSubscribeToTracks() return; } - SubscribeToTracksAsync().LogIfFailed(); + //StreamTODO: add cancellation token support + SubscribeToTracksAsync(GetCurrentCancellationTokenOrDefault()).ContinueWith(t => + { + if (ActiveCall == null) + { + return; + } + + t.LogIfFailed(); + }); _lastTrackSubscriptionRequestTime = _timeService.Time; _trackSubscriptionRequested = false; @@ -624,7 +774,8 @@ private void TryExecuteSubscribeToTracks() /// /// Request this via . We don't want to call it too often /// - private async Task SubscribeToTracksAsync() + /// + private async Task SubscribeToTracksAsync(CancellationToken cancellationToken) { if (ActiveCall?.Participants == null || !ActiveCall.Participants.Any()) { @@ -658,7 +809,7 @@ private async Task SubscribeToTracksAsync() #endif var response = await RpcCallAsync(request, GeneratedAPI.UpdateSubscriptions, - nameof(GeneratedAPI.UpdateSubscriptions), response => response.Error); + nameof(GeneratedAPI.UpdateSubscriptions), cancellationToken, response => response.Error); if (ActiveCall == null) { @@ -768,8 +919,9 @@ private async Task SendIceCandidateAsync(RTCIceCandidate candidate, StreamPeerTy if (_callState == CallingState.Joined) { + var cancellationToken = GetCurrentCancellationTokenOrDefault(); await RpcCallAsync(iceTrickle, GeneratedAPI.IceTrickle, nameof(GeneratedAPI.IceTrickle), - response => response.Error); + cancellationToken, response => response.Error); } else { @@ -810,6 +962,8 @@ private void UpdateAudioRecording() private void OnSfuJoinResponse(JoinResponse joinResponse) { + //StreamTODO: what if left the call and started a new one but the JoinResponse belongs to the previous session? + _sfuTracer?.Trace(PeerConnectionTraceKey.JoinRequest, joinResponse); _logs.InfoIfDebug($"Handle Sfu {nameof(JoinResponse)}"); ActiveCall.UpdateFromSfu(joinResponse); @@ -820,10 +974,16 @@ private void OnSfuJoinedCall() { CallState = CallingState.Joined; + var cancellationToken = GetCurrentCancellationTokenOrDefault(); foreach (var iceTrickle in _pendingIceTrickleRequests) { + if (cancellationToken.IsCancellationRequested) + { + return; + } + RpcCallAsync(iceTrickle, GeneratedAPI.IceTrickle, nameof(GeneratedAPI.IceTrickle), - response => response.Error).LogIfFailed(); + cancellationToken, response => response.Error).LogIfFailed(); } } @@ -865,6 +1025,11 @@ private async void OnSfuSubscriberOffer(SubscriberOffer subscriberOffer) try { + if (GetCurrentCancellationTokenOrDefault().IsCancellationRequested) + { + return; + } + //StreamTodo: handle subscriberOffer.iceRestart var rtcSessionDescription = new RTCSessionDescription { @@ -874,7 +1039,7 @@ private async void OnSfuSubscriberOffer(SubscriberOffer subscriberOffer) try { - await Subscriber.SetRemoteDescriptionAsync(rtcSessionDescription); + await Subscriber.SetRemoteDescriptionAsync(rtcSessionDescription, GetCurrentCancellationTokenOrDefault()); Subscriber.ThrowDisposedDuringOperationIfNull(); } catch (Exception e) @@ -884,14 +1049,14 @@ private async void OnSfuSubscriberOffer(SubscriberOffer subscriberOffer) throw; } - var answer = await Subscriber.CreateAnswerAsync(); + var answer = await Subscriber.CreateAnswerAsync(GetCurrentCancellationTokenOrDefault()); Subscriber.ThrowDisposedDuringOperationIfNull(); //StreamTodo: mangle SDP try { - await Subscriber.SetLocalDescriptionAsync(ref answer); + await Subscriber.SetLocalDescriptionAsync(ref answer, GetCurrentCancellationTokenOrDefault()); Subscriber.ThrowDisposedDuringOperationIfNull(); } catch (Exception e) @@ -909,11 +1074,15 @@ private async void OnSfuSubscriberOffer(SubscriberOffer subscriberOffer) }; await RpcCallAsync(sendAnswerRequest, GeneratedAPI.SendAnswer, nameof(GeneratedAPI.SendAnswer), - response => response.Error, - preLog: true); + GetCurrentCancellationTokenOrDefault(), response => response.Error, preLog: true); + } + catch (OperationCanceledException) + { + // Expected during shutdown, don't log as error } catch (DisposedDuringOperationException) { + // Expected during shutdown } catch (Exception e) { @@ -930,11 +1099,14 @@ private void OnSfuTrackUnpublished(TrackUnpublished trackUnpublished) var sessionId = trackUnpublished.SessionId; var type = trackUnpublished.Type.ToPublicEnum(); var cause = trackUnpublished.Cause; + + // StreamTODO: test if this works well with other user muting this user + var updateLocalParticipantState = cause != TrackUnpublishReason.Unspecified && cause != TrackUnpublishReason.UserMuted; // Optionally available. Read TrackUnpublished.participant comment in events.proto var participantSfuDto = trackUnpublished.Participant; - UpdateParticipantTracksState(userId, sessionId, type, isEnabled: false, out var participant); + UpdateParticipantTracksState(userId, sessionId, type, isEnabled: false, updateLocalParticipantState, out var participant); if (participantSfuDto != null && participant != null) { @@ -955,7 +1127,7 @@ private void OnSfuTrackPublished(TrackPublished trackPublished) // Optionally available. Read TrackUnpublished.participant comment in events.proto var participantSfuDto = trackPublished.Participant; - UpdateParticipantTracksState(userId, sessionId, type, isEnabled: true, out var participant); + UpdateParticipantTracksState(userId, sessionId, type, isEnabled: true, updateLocalParticipantState: true, out var participant); if (participantSfuDto != null && participant != null) { @@ -968,7 +1140,7 @@ private void OnSfuTrackPublished(TrackPublished trackPublished) } private void UpdateParticipantTracksState(string userId, string sessionId, TrackType trackType, bool isEnabled, - out StreamVideoCallParticipant participant) + bool updateLocalParticipantState, out StreamVideoCallParticipant participant) { participant = (StreamVideoCallParticipant)ActiveCall.Participants.FirstOrDefault(p => p.SessionId == sessionId); @@ -978,7 +1150,7 @@ private void UpdateParticipantTracksState(string userId, string sessionId, Track return; } - if (participant.IsLocalParticipant) + if (participant.IsLocalParticipant && updateLocalParticipantState) { //StreamTODO: most probably expose RtcSession TrackStateChanged event so that AudioDeviceManager can subscribe @@ -1015,18 +1187,20 @@ private async Task UpdateMuteStateAsync(TrackType trackType, bool isEnabled) return; } + var cancellationToken = _joinCallCts?.Token ?? default; await RpcCallAsync(new UpdateMuteStatesRequest - { - SessionId = SessionId, - MuteStates = { - new TrackMuteState + SessionId = SessionId, + MuteStates = { - TrackType = trackType.ToInternalEnum(), - Muted = !isEnabled + new TrackMuteState + { + TrackType = trackType.ToInternalEnum(), + Muted = !isEnabled + } } - } - }, GeneratedAPI.UpdateMuteStates, nameof(GeneratedAPI.UpdateSubscriptions), response => response.Error); + }, GeneratedAPI.UpdateMuteStates, nameof(GeneratedAPI.UpdateSubscriptions), cancellationToken, + response => response.Error); } private void InternalExecuteSetPublisherAudioTrackEnabled(bool isEnabled) @@ -1205,8 +1379,9 @@ private void OnSfuWebSocketOnCallEnded() //StreamTodo: implement retry strategy like in Android SDK //If possible, take into account if we the update is still valid e.g. private async Task RpcCallAsync(TRequest request, - Func> rpcCallAsync, string debugRequestName, - Func getError, bool preLog = false, + Func> rpcCallAsync, string debugRequestName, + CancellationToken cancellationToken, Func getError, + bool preLog = false, bool postLog = true) { //StreamTodo: use rpcCallAsync.GetMethodInfo().Name; instead debugRequestName @@ -1236,7 +1411,8 @@ var errorMsg } #endif - var response = await rpcCallAsync(_httpClient, request); + // StreamTODO: Add multiple retries + var response = await rpcCallAsync(_httpClient, request, cancellationToken); if (!skipTracing) { @@ -1322,7 +1498,12 @@ private async void OnPublisherNegotiationNeeded() $"{nameof(Publisher.SignalingState)} state is not stable, current state: {Publisher.SignalingState}"); } - var offer = await Publisher.CreateOfferAsync(); + if(GetCurrentCancellationTokenOrDefault().IsCancellationRequested) + { + return; + } + + var offer = await Publisher.CreateOfferAsync(GetCurrentCancellationTokenOrDefault()); Publisher.ThrowDisposedDuringOperationIfNull(); //StreamTOodo: check if SDP is null or empty (this would throw an exception during setting) @@ -1344,7 +1525,7 @@ private async void OnPublisherNegotiationNeeded() try { - await Publisher.SetLocalDescriptionAsync(ref offer); + await Publisher.SetLocalDescriptionAsync(ref offer, GetCurrentCancellationTokenOrDefault()); Publisher.ThrowDisposedDuringOperationIfNull(); } catch (Exception e) @@ -1383,8 +1564,9 @@ private async void OnPublisherNegotiationNeeded() _logs.Warning($"SetPublisherRequest:\n{serializedRequest}"); #endif + //StreamTODO: add cancellation token support var result = await RpcCallAsync(request, GeneratedAPI.SetPublisher, nameof(GeneratedAPI.SetPublisher), - response => response.Error); + GetCurrentCancellationTokenOrDefault(), response => response.Error); Publisher.ThrowDisposedDuringOperationIfNull(); #if STREAM_DEBUG_ENABLED @@ -1397,7 +1579,7 @@ await Publisher.SetRemoteDescriptionAsync(new RTCSessionDescription() { type = RTCSdpType.Answer, sdp = result.Sdp - }); + }, GetCurrentCancellationTokenOrDefault()); } catch (Exception e) { @@ -1406,8 +1588,13 @@ await Publisher.SetRemoteDescriptionAsync(new RTCSessionDescription() throw; } } + catch (OperationCanceledException) + { + // Expected during shutdown, don't log as error + } catch (DisposedDuringOperationException) { + // Expected during shutdown } catch (Exception e) { @@ -1666,6 +1853,7 @@ private void CreatePublisher(IEnumerable iceServers) Publisher.NegotiationNeeded += OnPublisherNegotiationNeeded; Publisher.PublisherAudioTrackChanged += OnPublisherAudioTrackChanged; Publisher.PublisherVideoTrackChanged += OnPublisherVideoTrackChanged; + Publisher.Disconnected += PublisherOnDisconnected; Publisher.InitPublisherTracks(); @@ -1681,6 +1869,7 @@ private void DisposePublisher() Publisher.NegotiationNeeded -= OnPublisherNegotiationNeeded; Publisher.PublisherAudioTrackChanged -= OnPublisherAudioTrackChanged; Publisher.PublisherVideoTrackChanged -= OnPublisherVideoTrackChanged; + Publisher.Disconnected += PublisherOnDisconnected; Publisher.Dispose(); Publisher = null; } @@ -1696,6 +1885,14 @@ private void OnPublisherVideoTrackChanged(VideoStreamTrack videoTrack) { PublisherVideoTrackChanged?.Invoke(); } + + void PublisherOnDisconnected() + { + if (CallState == CallingState.Joined || CallState == CallingState.Joining) + { + PeerConnectionDisconnectedDuringSession?.Invoke(); + } + } private void OnSfuWebSocketDisconnected() { @@ -1715,6 +1912,7 @@ private static bool AssertCallIdMatch(IStreamCall activeCall, string callId, ILo return true; } + private void SubscribeToSfuEvents() { diff --git a/Packages/StreamVideo/Runtime/Core/LowLevelClient/StreamPeerConnection.cs b/Packages/StreamVideo/Runtime/Core/LowLevelClient/StreamPeerConnection.cs index 98967ca8..e2c1aa66 100644 --- a/Packages/StreamVideo/Runtime/Core/LowLevelClient/StreamPeerConnection.cs +++ b/Packages/StreamVideo/Runtime/Core/LowLevelClient/StreamPeerConnection.cs @@ -6,6 +6,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; using StreamVideo.Core.Configs; using StreamVideo.Core.Models; @@ -30,6 +31,8 @@ internal class StreamPeerConnection : IDisposable public event Action PublisherVideoTrackChanged; public event Action PublisherAudioTrackChanged; + public event Action Disconnected; + public bool IsRemoteDescriptionAvailable { get @@ -153,20 +156,21 @@ public void InitPublisherTracks() public void RestartIce() => _peerConnection.RestartIce(); - public Task SetLocalDescriptionAsync(ref RTCSessionDescription offer) + public Task SetLocalDescriptionAsync(ref RTCSessionDescription offer, + CancellationToken cancellationToken) { #if STREAM_DEBUG_ENABLED _logs.Warning($"[{_peerType}] Set LocalDesc:\n" + offer.sdp); #endif _tracer?.Trace(PeerConnectionTraceKey.SetLocalDescription, offer.sdp); - return _peerConnection.SetLocalDescriptionAsync(ref offer); + return _peerConnection.SetLocalDescriptionAsync(ref offer, cancellationToken); } - public async Task SetRemoteDescriptionAsync(RTCSessionDescription offer) + public async Task SetRemoteDescriptionAsync(RTCSessionDescription offer, CancellationToken cancellationToken) { _tracer?.Trace(PeerConnectionTraceKey.SetRemoteDescription, offer.sdp); - await _peerConnection.SetRemoteDescriptionAsync(ref offer); + await _peerConnection.SetRemoteDescriptionAsync(ref offer, cancellationToken); #if STREAM_DEBUG_ENABLED _logs.Warning( @@ -200,16 +204,16 @@ public void AddIceCandidate(RTCIceCandidateInit iceCandidateInit) _peerConnection.AddIceCandidate(iceCandidate); } - public async Task CreateOfferAsync() + public async Task CreateOfferAsync(CancellationToken cancellationToken) { - var offer = await _peerConnection.CreateOfferAsync(); + var offer = await _peerConnection.CreateOfferAsync(cancellationToken); _tracer?.Trace(PeerConnectionTraceKey.CreateOffer, offer.sdp); return offer; } - public async Task CreateAnswerAsync() + public async Task CreateAnswerAsync(CancellationToken cancellationToken) { - var answer = await _peerConnection.CreateAnswerAsync(); + var answer = await _peerConnection.CreateAnswerAsync(cancellationToken); _tracer?.Trace(PeerConnectionTraceKey.CreateAnswer, answer.sdp); return answer; } @@ -225,7 +229,7 @@ public void Update() } } - public Task GetStatsReportAsync() => _peerConnection.GetStatsAsync(); + public Task GetStatsReportAsync(CancellationToken cancellationToken) => _peerConnection.GetStatsAsync(cancellationToken); public PublisherVideoSettings GetLatestVideoSettings() { @@ -401,6 +405,11 @@ private void OnConnectionStateChange(RTCPeerConnectionState state) _logs.Warning($"[{_peerType}] OnConnectionStateChange to: {state}"); #endif _tracer?.Trace(PeerConnectionTraceKey.OnConnectionStateChange, state.ToString()); + + if (state == RTCPeerConnectionState.Disconnected) + { + Disconnected?.Invoke(); + } } private void OnTrack(RTCTrackEvent trackEvent) diff --git a/Packages/StreamVideo/Runtime/Core/LowLevelClient/StreamVideoLowLevelClient.cs b/Packages/StreamVideo/Runtime/Core/LowLevelClient/StreamVideoLowLevelClient.cs index 79e13826..47840ce5 100644 --- a/Packages/StreamVideo/Runtime/Core/LowLevelClient/StreamVideoLowLevelClient.cs +++ b/Packages/StreamVideo/Runtime/Core/LowLevelClient/StreamVideoLowLevelClient.cs @@ -143,8 +143,8 @@ public StreamVideoLowLevelClient(IWebsocketClient coordinatorWebSocket, IWebsock } //StreamTodo: move to factory - var coordinatorReconnect = new ReconnectScheduler(_timeService, this, _networkMonitor); - var sfuReconnect = new ReconnectScheduler(_timeService, this, _networkMonitor); + var coordinatorReconnect = new ReconnectScheduler(_timeService, this, _networkMonitor, shouldReconnect: () => true); + var sfuReconnect = new ReconnectScheduler(_timeService, this, _networkMonitor, shouldReconnect: () => RtcSession.ShouldSfuAttemptToReconnect()); //StreamTodo: move to factory _coordinatorWS = new CoordinatorWebSocket(coordinatorWebSocket, coordinatorReconnect, authProvider: this, @@ -189,7 +189,7 @@ public async Task ConnectUserAsync(string apiKey, string userId, string userToke #endif await _coordinatorWS.ConnectAsync(cancellationToken); - await UpdateLocationHintAsync(); + await UpdateLocationHintAsync(cancellationToken); Connected?.Invoke(); } @@ -222,12 +222,12 @@ public void Update() RtcSession.Update(); } - public async Task GetLocationHintAsync() + public async Task GetLocationHintAsync(CancellationToken cancellationToken = default) { if (_locationHint.IsNullOrEmpty()) { _logs.Warning("No location hint - retrying to fetch it"); - await UpdateLocationHintAsync(); + await UpdateLocationHintAsync(cancellationToken); } // StreamTodo: attempt to get location hint if not fetched already + perhaps there's an ongoing request and we can just wait if (_locationHint.IsNullOrEmpty()) @@ -297,7 +297,7 @@ public void Dispose() internal IInternalVideoClientApi InternalVideoClientApi { get; } internal RtcSession RtcSession { get; } - internal Task StartCallSessionAsync(StreamCall call) => RtcSession.StartAsync(call); + internal Task StartCallSessionAsync(StreamCall call, CancellationToken cancellationToken) => RtcSession.StartAsync(call, cancellationToken); //internal Task StopCallSessionAsync() => RtcSession.StopAsync(); //StreamTodo: remove @@ -366,10 +366,10 @@ private void OnSfuWebSocketConnectionStateChanged(ConnectionState previous, Conn //StreamTodo: cancellation token //StreamTodo: make few attempts + can be awaited by the JoinCallAsync + support reconnections - private async Task UpdateLocationHintAsync() + private async Task UpdateLocationHintAsync(CancellationToken cancellationToken) { var headers = new List>>(); - await _httpClient.HeadAsync(LocationHintWebUri, headers); + await _httpClient.HeadAsync(LocationHintWebUri, headers, cancellationToken); var locationHeader = headers.FirstOrDefault(_ => _.Key.ToLower() == LocationHintHeaderKey); if (locationHeader.Key.IsNullOrEmpty() || !locationHeader.Value.Any()) diff --git a/Packages/StreamVideo/Runtime/Core/LowLevelClient/UnityWebRtcWrapperExtensions.cs b/Packages/StreamVideo/Runtime/Core/LowLevelClient/UnityWebRtcWrapperExtensions.cs index 802ec06a..f20ce6f4 100644 --- a/Packages/StreamVideo/Runtime/Core/LowLevelClient/UnityWebRtcWrapperExtensions.cs +++ b/Packages/StreamVideo/Runtime/Core/LowLevelClient/UnityWebRtcWrapperExtensions.cs @@ -1,4 +1,5 @@ using System; +using System.Threading; using System.Threading.Tasks; using Unity.WebRTC; @@ -8,35 +9,36 @@ namespace StreamVideo.Core.LowLevelClient internal static class UnityWebRtcWrapperExtensions { //StreamTodo: in webRTC example they also check for _peerConnection.SignalingState != RTCSignalingState.Stable - public static Task CreateOfferAsync(this RTCPeerConnection peerConnection) - => WaitForOperationAsync(peerConnection.CreateOffer(), r => r.Desc); + public static Task CreateOfferAsync(this RTCPeerConnection peerConnection, CancellationToken cancellationToken) + => WaitForOperationAsync(peerConnection.CreateOffer(), r => r.Desc, cancellationToken); public static Task CreateOfferAsync(this RTCPeerConnection peerConnection, - ref RTCOfferAnswerOptions options) - => WaitForOperationAsync(peerConnection.CreateOffer(ref options), r => r.Desc); + ref RTCOfferAnswerOptions options, CancellationToken cancellationToken) + => WaitForOperationAsync(peerConnection.CreateOffer(ref options), r => r.Desc, cancellationToken); - public static Task CreateAnswerAsync(this RTCPeerConnection peerConnection) - => WaitForOperationAsync(peerConnection.CreateAnswer(), r => r.Desc); + public static Task CreateAnswerAsync(this RTCPeerConnection peerConnection, CancellationToken cancellationToken) + => WaitForOperationAsync(peerConnection.CreateAnswer(), r => r.Desc, cancellationToken); public static Task SetLocalDescriptionAsync(this RTCPeerConnection peerConnection, - ref RTCSessionDescription desc) - => WaitForOperationAsync(peerConnection.SetLocalDescription(ref desc)); + ref RTCSessionDescription desc, CancellationToken cancellationToken) + => WaitForOperationAsync(peerConnection.SetLocalDescription(ref desc), cancellationToken); public static Task SetRemoteDescriptionAsync(this RTCPeerConnection peerConnection, - ref RTCSessionDescription desc) - => WaitForOperationAsync(peerConnection.SetRemoteDescription(ref desc)); + ref RTCSessionDescription desc, CancellationToken cancellationToken) + => WaitForOperationAsync(peerConnection.SetRemoteDescription(ref desc), cancellationToken); - public static Task GetStatsAsync(this RTCPeerConnection peerConnection) - => WaitForOperationAsync(peerConnection.GetStats(), r => r.Value); + public static Task GetStatsAsync(this RTCPeerConnection peerConnection, CancellationToken cancellationToken) + => WaitForOperationAsync(peerConnection.GetStats(), r => r.Value, cancellationToken); private static async Task WaitForOperationAsync( - this TOperation asyncOperation, Func response) + this TOperation asyncOperation, Func response, CancellationToken cancellationToken) where TOperation : AsyncOperationBase { // StreamTodo: refactor to use coroutine runner to reduce runtime allocations while (!asyncOperation.IsDone) { - await Task.Delay(1); + cancellationToken.ThrowIfCancellationRequested(); + await Task.Delay(1, cancellationToken); } if (asyncOperation.IsError) @@ -48,13 +50,14 @@ private static async Task WaitForOperationAsync( - this TOperation asyncOperation) + this TOperation asyncOperation, CancellationToken cancellationToken) where TOperation : AsyncOperationBase { // StreamTodo: refactor to use coroutine runner to reduce runtime allocations while (!asyncOperation.IsDone) { - await Task.Delay(1); + cancellationToken.ThrowIfCancellationRequested(); + await Task.Delay(1, cancellationToken); } if (asyncOperation.IsError) diff --git a/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/BasePersistentWebSocket.cs b/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/BasePersistentWebSocket.cs index c49b1385..cbab6449 100644 --- a/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/BasePersistentWebSocket.cs +++ b/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/BasePersistentWebSocket.cs @@ -22,6 +22,8 @@ internal abstract class BasePersistentWebSocket : IPersistentWebSocket public event Action Connected; public event Action Disconnected; + public const int ConnectTimeoutMs = 1000; + public ConnectionState ConnectionState { get => _connectionState; @@ -78,7 +80,9 @@ private void TryToReconnect() Logs.Info($"{GetType()} TryToReconnect"); #endif - ConnectAsync().LogIfFailed(); + var cts = new CancellationTokenSource(); + cts.CancelAfter(ConnectTimeoutMs); + ConnectAsync(cts.Token).LogIfFailed(); } public Task ConnectAsync(CancellationToken cancellationToken = default) @@ -105,7 +109,7 @@ public async Task DisconnectAsync(WebSocketCloseStatus closeStatus, string close } ConnectionState = ConnectionState.Disconnecting; - + await OnDisconnectingAsync(closeMessage); if (WebsocketClient == null) @@ -224,6 +228,7 @@ protected void OnHealthCheckReceived() protected virtual Task OnDisconnectingAsync(string closeMessage) { + _reconnectScheduler.Reset(); return Task.CompletedTask; } @@ -338,6 +343,11 @@ private TEvent DeserializeEvent(string content, out TDto dto) private void OnReconnectionScheduled() { + if (!NextReconnectTime.HasValue) + { + throw new ArgumentNullException(nameof(NextReconnectTime)); + } + ConnectionState = ConnectionState.WaitToReconnect; var timeLeft = NextReconnectTime.Value - TimeService.Time; diff --git a/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/CoordinatorWebSocket.cs b/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/CoordinatorWebSocket.cs index e1e52685..1bec2312 100644 --- a/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/CoordinatorWebSocket.cs +++ b/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/CoordinatorWebSocket.cs @@ -73,7 +73,7 @@ protected override async Task ExecuteConnectAsync(CancellationToken cancellation var uri = UriFactory.CreateCoordinatorConnectionUri(); //StreamTodo: Add cancel token support to WS - await WebsocketClient.ConnectAsync(uri); + await WebsocketClient.ConnectAsync(uri, cancellationToken); #if STREAM_DEBUG_ENABLED Logs.Info("Coordinator connected! Let's send the connect message"); diff --git a/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/SfuWebSocket.cs b/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/SfuWebSocket.cs index 9d22123e..2a907622 100644 --- a/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/SfuWebSocket.cs +++ b/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/SfuWebSocket.cs @@ -179,7 +179,7 @@ protected override async Task ExecuteConnectAsync(CancellationToken cancellation var sfuUri = UriFactory.CreateSfuConnectionUri(_sfuUrl); - await WebsocketClient.ConnectAsync(sfuUri); + await WebsocketClient.ConnectAsync(sfuUri, cancellationToken); //StreamTodo: review when is the actual "connected state" - perhaps not the WS connection itself but receiving an appropriate event should set the flag //e.g. are we able to send any data as soon as the connection is established? @@ -340,7 +340,7 @@ private void OnHandleJoinResponse(JoinResponse joinResponse) { ConnectionState = ConnectionState.Connected; - _connectUserTaskSource.SetResult(true); + _connectUserTaskSource.TrySetResult(true); _connectUserTaskSource = null; JoinResponse?.Invoke(joinResponse); diff --git a/Packages/StreamVideo/Runtime/Core/Stats/IWebRtcStatsCollector.cs b/Packages/StreamVideo/Runtime/Core/Stats/IWebRtcStatsCollector.cs index 29c2db70..ea7367a9 100644 --- a/Packages/StreamVideo/Runtime/Core/Stats/IWebRtcStatsCollector.cs +++ b/Packages/StreamVideo/Runtime/Core/Stats/IWebRtcStatsCollector.cs @@ -1,4 +1,5 @@ using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; using StreamVideo.v1.Sfu.Models; @@ -6,14 +7,14 @@ namespace StreamVideo.Core.Stats { internal interface IWebRtcStatsCollector { - Task GetPublisherStatsJsonAsync(); + Task GetPublisherStatsJsonAsync(CancellationToken cancellationToken); - Task GetSubscriberStatsJsonAsync(); + Task GetSubscriberStatsJsonAsync(CancellationToken cancellationToken); - Task GetRtcStatsJsonAsync(); + Task GetRtcStatsJsonAsync(CancellationToken cancellationToken); - Task> GetEncodeStatsAsync(); + Task> GetEncodeStatsAsync(CancellationToken cancellationToken); - Task> GetDecodeStatsAsync(); + Task> GetDecodeStatsAsync(CancellationToken cancellationToken); } } \ No newline at end of file diff --git a/Packages/StreamVideo/Runtime/Core/Stats/UnityWebRtcStatsCollector.cs b/Packages/StreamVideo/Runtime/Core/Stats/UnityWebRtcStatsCollector.cs index 54918142..0a47720f 100644 --- a/Packages/StreamVideo/Runtime/Core/Stats/UnityWebRtcStatsCollector.cs +++ b/Packages/StreamVideo/Runtime/Core/Stats/UnityWebRtcStatsCollector.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Linq; using System.Reflection; +using System.Threading; using System.Threading.Tasks; using StreamVideo.Core.LowLevelClient; using StreamVideo.Core.Trace; @@ -34,23 +35,23 @@ private class StatSnapshot public Dictionary Dict { get; set; } } - public async Task GetPublisherStatsJsonAsync() + public async Task GetPublisherStatsJsonAsync(CancellationToken cancellationToken) { - var report = await _rtcSession.Publisher.GetStatsReportAsync(); + var report = await _rtcSession.Publisher.GetStatsReportAsync(cancellationToken); return ConvertStatsToJson(report.Stats); } - public async Task GetSubscriberStatsJsonAsync() + public async Task GetSubscriberStatsJsonAsync(CancellationToken cancellationToken) { - var report = await _rtcSession.Subscriber.GetStatsReportAsync(); + var report = await _rtcSession.Subscriber.GetStatsReportAsync(cancellationToken); return ConvertStatsToJson(report.Stats); } - public async Task GetRtcStatsJsonAsync() + public async Task GetRtcStatsJsonAsync(CancellationToken cancellationToken) { // Get both publisher and subscriber stats - var publisherReport = await _rtcSession.Publisher.GetStatsReportAsync(); - var subscriberReport = await _rtcSession.Subscriber.GetStatsReportAsync(); + var publisherReport = await _rtcSession.Publisher.GetStatsReportAsync(cancellationToken); + var subscriberReport = await _rtcSession.Subscriber.GetStatsReportAsync(cancellationToken); // Compute delta-compressed stats var publisherDelta = ComputeDeltaCompression(_previousPublisherStats, publisherReport.Stats); @@ -239,15 +240,15 @@ private Dictionary ComputeDeltaCompression( return result; } - public async Task> GetEncodeStatsAsync() + public async Task> GetEncodeStatsAsync(CancellationToken cancellationToken) { - var report = await _rtcSession.Publisher.GetStatsReportAsync(); + var report = await _rtcSession.Publisher.GetStatsReportAsync(cancellationToken); return ComputeEncodeStats(report.Stats); } - public async Task> GetDecodeStatsAsync() + public async Task> GetDecodeStatsAsync(CancellationToken cancellationToken) { - var report = await _rtcSession.Subscriber.GetStatsReportAsync(); + var report = await _rtcSession.Subscriber.GetStatsReportAsync(cancellationToken); return ComputeDecodeStats(report.Stats); } diff --git a/Packages/StreamVideo/Runtime/Core/Stats/WebRtcStatsSender.cs b/Packages/StreamVideo/Runtime/Core/Stats/WebRtcStatsSender.cs index 9adde97d..b991150e 100644 --- a/Packages/StreamVideo/Runtime/Core/Stats/WebRtcStatsSender.cs +++ b/Packages/StreamVideo/Runtime/Core/Stats/WebRtcStatsSender.cs @@ -1,4 +1,5 @@ using System; +using System.Threading; using System.Threading.Tasks; using Google.Protobuf.Collections; using StreamVideo.Core.LowLevelClient; @@ -21,10 +22,15 @@ public void Update() if (_timeService.Time > _lastTimeSent + SendInterval && _currentSendTask == null) { - _currentSendTask = CollectAndSend().ContinueWith(t => + //StreamTODO: consider adding cancellation token support here -> leaving the call should cancel any ongoing operations + _currentSendTask = CollectAndSend(CancellationToken.None).ContinueWith(t => { _currentSendTask = null; - t.LogIfFailed(); + + if (_rtcSession.CallState == CallingState.Joining) + { + t.LogIfFailed(); + } }); _lastTimeSent = _timeService.Time; } @@ -34,7 +40,7 @@ public void Update() /// Sends final stats immediately, flushing any remaining trace data. /// Called when leaving a call to ensure all stats are captured. /// - public async Task SendFinalStatsAsync() + public async Task SendFinalStatsAsync(CancellationToken cancellationToken) { if (_rtcSession.ActiveCall == null) { @@ -46,7 +52,7 @@ public async Task SendFinalStatsAsync() await _currentSendTask; } - await CollectAndSend(); + await CollectAndSend(cancellationToken); } internal WebRtcStatsSender(RtcSession rtcSession, IWebRtcStatsCollector webRtcStatsCollector, @@ -74,18 +80,18 @@ internal WebRtcStatsSender(RtcSession rtcSession, IWebRtcStatsCollector webRtcSt private float _lastTimeSent; private Task _currentSendTask; - private async Task CollectAndSend() + private async Task CollectAndSend(CancellationToken cancellationToken) { if (_rtcSession.ActiveCall == null) { return; } - var subscriberStatsJson = await _webRtcStatsCollector.GetSubscriberStatsJsonAsync(); - var publisherStatsJson = await _webRtcStatsCollector.GetPublisherStatsJsonAsync(); - var rtcStatsJson = await _webRtcStatsCollector.GetRtcStatsJsonAsync(); - var encodeStats = await _webRtcStatsCollector.GetEncodeStatsAsync(); - var decodeStats = await _webRtcStatsCollector.GetDecodeStatsAsync(); + var subscriberStatsJson = await _webRtcStatsCollector.GetSubscriberStatsJsonAsync(cancellationToken); + var publisherStatsJson = await _webRtcStatsCollector.GetPublisherStatsJsonAsync(cancellationToken); + var rtcStatsJson = await _webRtcStatsCollector.GetRtcStatsJsonAsync(cancellationToken); + var encodeStats = await _webRtcStatsCollector.GetEncodeStatsAsync(cancellationToken); + var decodeStats = await _webRtcStatsCollector.GetDecodeStatsAsync(cancellationToken); if (subscriberStatsJson == null || publisherStatsJson == null || rtcStatsJson == null) { @@ -130,7 +136,8 @@ private async Task CollectAndSend() #endif #pragma warning restore CS0162 // Re-enable unreachable code warning - await _rtcSession.SendWebRtcStats(request); + cancellationToken.ThrowIfCancellationRequested(); + await _rtcSession.SendWebRtcStats(request, cancellationToken); } } } \ No newline at end of file diff --git a/Packages/StreamVideo/Runtime/Core/StreamVideoClient.cs b/Packages/StreamVideo/Runtime/Core/StreamVideoClient.cs index 9c97c8c9..b4eb3a07 100644 --- a/Packages/StreamVideo/Runtime/Core/StreamVideoClient.cs +++ b/Packages/StreamVideo/Runtime/Core/StreamVideoClient.cs @@ -5,10 +5,12 @@ using System.Collections; using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; using StreamVideo.Core.QueryBuilders.Sort.Calls; using StreamVideo.Core.Configs; using StreamVideo.Core.DeviceManagers; +using StreamVideo.Core.Exceptions; using StreamVideo.Core.InternalDTO.Events; using StreamVideo.Core.InternalDTO.Requests; using StreamVideo.Core.LowLevelClient; @@ -40,17 +42,19 @@ public enum DisconnectReason /// SFU server disconnected /// SfuWsDisconnected, - + /// /// /// CoordinatorWsDisconnected, + + VideoServerDisconnected, } - + public delegate void CallHandler(IStreamCall call); public delegate void ConnectHandler(IStreamVideoUser localUser); - + public delegate void DisconnectedHandler(DisconnectReason reason); public class StreamVideoClient : IStreamVideoClient, IInternalStreamVideoClient @@ -107,56 +111,68 @@ public static string CreateDeveloperAuthToken(string userId) /// public static string SanitizeUserId(string userId) => StreamVideoLowLevelClient.SanitizeUserId(userId); - //StreamTODO: this throws exception if the call doesn't exist. Check with other SDKs what is the expected behavior - /// - /// Will return null if the call doesn't exist - /// - public async Task GetCallAsync(StreamCallType callType, string callId) + public Task GetCallAsync(StreamCallType callType, string callId) + => GetCallAsync(callType, callId, CancellationToken.None); + + public async Task GetCallAsync(StreamCallType callType, string callId, + CancellationToken cancellationToken) { //StreamTodo: validate input var callData = await InternalLowLevelClient.InternalVideoClientApi.GetCallAsync(callType, callId, - new GetOrCreateCallRequestInternalDTO()); + new GetOrCreateCallRequestInternalDTO(), cancellationToken); return _cache.TryCreateOrUpdate(callData); } + public Task GetOrCreateCallAsync(StreamCallType callType, string callId) + => GetOrCreateCallAsync(callType, callId, CancellationToken.None); + //StreamTodo: add more params (same as in JoinCallAsync) + add to interface - public async Task GetOrCreateCallAsync(StreamCallType callType, string callId) + public async Task GetOrCreateCallAsync(StreamCallType callType, string callId, CancellationToken cancellationToken) { //StreamTodo: validate input var callData = await InternalLowLevelClient.InternalVideoClientApi.GetOrCreateCallAsync(callType, callId, - new GetOrCreateCallRequestInternalDTO()); + new GetOrCreateCallRequestInternalDTO(), cancellationToken); //StreamTodo: what if null? return _cache.TryCreateOrUpdate(callData); } + public Task JoinCallAsync(StreamCallType callType, string callId, bool create, bool ring, + bool notify) + => JoinCallAsync(callType, callId, create, ring, notify, CancellationToken.None); + //StreamTodo: if ring and notify can't be both true then perhaps enum NotifyMode.Ring, NotifyMode.Notify? //StreamTodo: add CreateCallOptions public async Task JoinCallAsync(StreamCallType callType, string callId, bool create, bool ring, - bool notify) + bool notify, CancellationToken cancellationToken) { - //StreamTodo: check if we're already in a call? + var callState = InternalLowLevelClient.RtcSession.CallState; + if (callState == CallingState.Joined || callState == CallingState.Joining) + { + throw new StreamCallInProgressException("Cannot join a call while another call is active or joining."); + } IStreamCall call; if (!create) { //StreamTodo: check android SDK if the flow is the same - call = await GetCallAsync(callType, callId); + call = await GetCallAsync(callType, callId, cancellationToken); if (call == null) { - throw new InvalidOperationException($"Call with id `{callId}` was not found"); + throw new StreamCallNotFoundException( + $"Call with type: `{callType}`, and ID: `{callId}` was not found. Both type and ID are required to identify a call."); } } else { - call = await GetOrCreateCallAsync(callType, callId); + call = await GetOrCreateCallAsync(callType, callId, cancellationToken); } // StreamTodo: check state if we don't have an active session already - var locationHint = await InternalLowLevelClient.GetLocationHintAsync(); + var locationHint = await InternalLowLevelClient.GetLocationHintAsync(cancellationToken); //StreamTodo: move this logic to call.Join, this way user can create call object and join later on @@ -169,8 +185,9 @@ public async Task JoinCallAsync(StreamCallType callType, string cal Custom = null, Members = null, SettingsOverride = null, - StartsAt = DateTimeOffset - .Now, //StreamTODO: check this, if we're just joining another call perhaps we shouldn't set this? + + //StreamTODO: check this, if we're just joining another call perhaps we shouldn't set this? + StartsAt = DateTimeOffset.Now, Team = null }, Location = locationHint, @@ -184,7 +201,7 @@ var joinCallResponse = await InternalLowLevelClient.InternalVideoClientApi.JoinCallAsync(callType, callId, joinCallRequest); _cache.TryCreateOrUpdate(joinCallResponse); - await InternalLowLevelClient.StartCallSessionAsync((StreamCall)call); + await InternalLowLevelClient.StartCallSessionAsync((StreamCall)call, cancellationToken); CallStarted?.Invoke(call); return call; @@ -206,9 +223,12 @@ public void Dispose() //StreamTodo: Consider removing this overload and exposing ConnectAsync() DisconnectAsync() only. The config would contain credentials (token or token provider), etc. //Similar to Android SDK: https://getstream.io/video/docs/android/guides/client-auth/ - public async Task ConnectUserAsync(AuthCredentials credentials) + public Task ConnectUserAsync(AuthCredentials credentials) + => ConnectUserAsync(credentials, CancellationToken.None); + + public async Task ConnectUserAsync(AuthCredentials credentials, CancellationToken cancellationToken) { - await InternalLowLevelClient.ConnectUserAsync(credentials); + await InternalLowLevelClient.ConnectUserAsync(credentials, cancellationToken); #if STREAM_DEBUG_ENABLED _logs.Warning("StreamVideoClient - CONNECTION - Trigger Connected event."); @@ -458,9 +478,11 @@ private async Task LeaveCallAsync(IStreamCall call) var callState = InternalLowLevelClient.RtcSession.CallState; if (callState == CallingState.Leaving || callState == CallingState.Offline) { - _logs.Warning($"{nameof(LeaveCallAsync)}: Call is already leaving or offline, skipping leave operation."); + _logs.Warning( + $"{nameof(LeaveCallAsync)}: Call is already leaving or offline, skipping leave operation."); return; } + await InternalLowLevelClient.RtcSession.StopAsync("User is leaving the call"); } catch (Exception e) @@ -527,14 +549,21 @@ private void SubscribeTo(StreamVideoLowLevelClient lowLevelClient) lowLevelClient.InternalCallSessionStartedEvent += OnInternalCallSessionStartedEvent; lowLevelClient.InternalCallSessionParticipantJoinedEvent += OnInternalCallSessionParticipantJoinedEvent; lowLevelClient.InternalCallSessionParticipantLeftEvent += OnInternalCallSessionParticipantLeftEvent; - lowLevelClient.InternalCallSessionParticipantCountsUpdatedEvent += OnInternalCallSessionParticipantCountsUpdatedEvent; + lowLevelClient.InternalCallSessionParticipantCountsUpdatedEvent + += OnInternalCallSessionParticipantCountsUpdatedEvent; lowLevelClient.InternalConnectionErrorEvent += OnInternalConnectionErrorEvent; lowLevelClient.InternalCustomVideoEvent += OnInternalCustomVideoEvent; lowLevelClient.Connected += InternalLowLevelClientOnConnected; - + lowLevelClient.Disconnected += OnLowLevelClientDisconnected; lowLevelClient.SfuDisconnected += OnLowLevelClientSfuDisconnected; + lowLevelClient.RtcSession.PeerConnectionDisconnectedDuringSession += OnRtcPeerConnectionDisconnectedDuringSession; + } + + private void OnRtcPeerConnectionDisconnectedDuringSession() + { + Disconnected?.Invoke(DisconnectReason.VideoServerDisconnected); } private void UnsubscribeFrom(StreamVideoLowLevelClient lowLevelClient) @@ -564,14 +593,17 @@ private void UnsubscribeFrom(StreamVideoLowLevelClient lowLevelClient) lowLevelClient.InternalCallSessionStartedEvent -= OnInternalCallSessionStartedEvent; lowLevelClient.InternalCallSessionParticipantJoinedEvent -= OnInternalCallSessionParticipantJoinedEvent; lowLevelClient.InternalCallSessionParticipantLeftEvent -= OnInternalCallSessionParticipantLeftEvent; - lowLevelClient.InternalCallSessionParticipantCountsUpdatedEvent -= OnInternalCallSessionParticipantCountsUpdatedEvent; + lowLevelClient.InternalCallSessionParticipantCountsUpdatedEvent + -= OnInternalCallSessionParticipantCountsUpdatedEvent; lowLevelClient.InternalConnectionErrorEvent -= OnInternalConnectionErrorEvent; lowLevelClient.InternalCustomVideoEvent -= OnInternalCustomVideoEvent; lowLevelClient.Connected -= InternalLowLevelClientOnConnected; - + lowLevelClient.Disconnected -= OnLowLevelClientDisconnected; lowLevelClient.SfuDisconnected -= OnLowLevelClientSfuDisconnected; + + lowLevelClient.RtcSession.PeerConnectionDisconnectedDuringSession -= OnRtcPeerConnectionDisconnectedDuringSession; } private void InternalLowLevelClientOnConnected() @@ -767,7 +799,7 @@ private void OnInternalCallSessionStartedEvent(CallSessionStartedEventInternalDT private void OnInternalCallSessionParticipantJoinedEvent(CallSessionParticipantJoinedEventInternalDTO eventData) { - if(ActiveCall == null || ActiveCall.Cid != eventData.CallCid) + if (ActiveCall == null || ActiveCall.Cid != eventData.CallCid) { return; } @@ -777,21 +809,22 @@ private void OnInternalCallSessionParticipantJoinedEvent(CallSessionParticipantJ private void OnInternalCallSessionParticipantLeftEvent(CallSessionParticipantLeftEventInternalDTO eventData) { - if(ActiveCall == null || ActiveCall.Cid != eventData.CallCid) + if (ActiveCall == null || ActiveCall.Cid != eventData.CallCid) { return; } - + InternalLowLevelClient.RtcSession.ActiveCall.UpdateFromCoordinator(eventData, _cache); } - private void OnInternalCallSessionParticipantCountsUpdatedEvent(CallSessionParticipantCountsUpdatedEventInternalDTO eventData) + private void OnInternalCallSessionParticipantCountsUpdatedEvent( + CallSessionParticipantCountsUpdatedEventInternalDTO eventData) { - if(ActiveCall == null || ActiveCall.Cid != eventData.CallCid) + if (ActiveCall == null || ActiveCall.Cid != eventData.CallCid) { return; } - + InternalLowLevelClient.RtcSession.ActiveCall.UpdateFromCoordinator(eventData); } @@ -817,7 +850,7 @@ private void OnInternalCustomVideoEvent(CustomVideoEventInternalDTO eventData) activeCall.NotifyCallEventReceived(callEvent); } - + private void OnLowLevelClientSfuDisconnected() => Disconnected?.Invoke(DisconnectReason.SfuWsDisconnected); private void OnLowLevelClientDisconnected() => Disconnected?.Invoke(DisconnectReason.CoordinatorWsDisconnected); diff --git a/Packages/StreamVideo/Runtime/Libs/Http/HttpClientAdapter.cs b/Packages/StreamVideo/Runtime/Libs/Http/HttpClientAdapter.cs index 9d93382e..e04aaa27 100644 --- a/Packages/StreamVideo/Runtime/Libs/Http/HttpClientAdapter.cs +++ b/Packages/StreamVideo/Runtime/Libs/Http/HttpClientAdapter.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Net.Http; using System.Net.Http.Headers; +using System.Threading; using System.Threading.Tasks; namespace StreamVideo.Libs.Http @@ -23,7 +24,7 @@ public void AddDefaultCustomHeader(string key, string value) => _httpClient.DefaultRequestHeaders.Add(key, value); public async Task SendHttpRequestAsync(HttpMethodType methodType, Uri uri, - object optionalRequestContent) + object optionalRequestContent, CancellationToken cancellationToken = default) { var httpContent = TryGetHttpContent(optionalRequestContent); @@ -31,13 +32,13 @@ Task ExecuteAsync() { switch (methodType) { - case HttpMethodType.Get: return _httpClient.GetAsync(uri); - case HttpMethodType.Post: return _httpClient.PostAsync(uri, httpContent); - case HttpMethodType.Put: return _httpClient.PutAsync(uri, httpContent); + case HttpMethodType.Get: return _httpClient.GetAsync(uri, cancellationToken); + case HttpMethodType.Post: return _httpClient.PostAsync(uri, httpContent, cancellationToken); + case HttpMethodType.Put: return _httpClient.PutAsync(uri, httpContent, cancellationToken); case HttpMethodType.Patch: return _httpClient.SendAsync(new HttpRequestMessage(new HttpMethod("PATCH"), uri) - { Content = httpContent }); - case HttpMethodType.Delete: return _httpClient.DeleteAsync(uri); + { Content = httpContent }, cancellationToken); + case HttpMethodType.Delete: return _httpClient.DeleteAsync(uri, cancellationToken); default: throw new ArgumentOutOfRangeException(nameof(methodType), methodType, null); } @@ -47,38 +48,38 @@ Task ExecuteAsync() return await HttpResponse.CreateFromHttpResponseMessageAsync(httpResponseMessage); } - public async Task GetAsync(Uri uri) + public async Task GetAsync(Uri uri, CancellationToken cancellationToken = default) { - var response = await _httpClient.GetAsync(uri); + var response = await _httpClient.GetAsync(uri, cancellationToken); return await HttpResponse.CreateFromHttpResponseMessageAsync(response); } - public async Task PostAsync(Uri uri, object content) + public async Task PostAsync(Uri uri, object content, CancellationToken cancellationToken = default) { var httpContent = TryGetHttpContent(content); var response = await _httpClient.PostAsync(uri, httpContent); return await HttpResponse.CreateFromHttpResponseMessageAsync(response); } - public async Task PutAsync(Uri uri, object content) + public async Task PutAsync(Uri uri, object content, CancellationToken cancellationToken = default) { var httpContent = TryGetHttpContent(content); var response = await _httpClient.PutAsync(uri, httpContent); return await HttpResponse.CreateFromHttpResponseMessageAsync(response); } - public async Task PatchAsync(Uri uri, object content) + public async Task PatchAsync(Uri uri, object content, CancellationToken cancellationToken = default) { var httpContent = TryGetHttpContent(content); var response = await _httpClient.SendAsync(new HttpRequestMessage(new HttpMethod("PATCH"), uri) - { Content = httpContent }); + { Content = httpContent }, cancellationToken); return await HttpResponse.CreateFromHttpResponseMessageAsync(response); } public async Task HeadAsync(Uri uri, - ICollection>> resultHeaders = null) + ICollection>> resultHeaders = null, CancellationToken cancellationToken = default) { - var response = await _httpClient.SendAsync(new HttpRequestMessage(new HttpMethod("HEAD"), uri)); + var response = await _httpClient.SendAsync(new HttpRequestMessage(new HttpMethod("HEAD"), uri), cancellationToken); if (resultHeaders != null) { foreach (var header in response.Headers) @@ -90,9 +91,9 @@ public async Task HeadAsync(Uri uri, return await HttpResponse.CreateFromHttpResponseMessageAsync(response); } - public async Task DeleteAsync(Uri uri) + public async Task DeleteAsync(Uri uri, CancellationToken cancellationToken = default) { - var response = await _httpClient.DeleteAsync(uri); + var response = await _httpClient.DeleteAsync(uri, cancellationToken); return await HttpResponse.CreateFromHttpResponseMessageAsync(response); } diff --git a/Packages/StreamVideo/Runtime/Libs/Http/IHttpClient.cs b/Packages/StreamVideo/Runtime/Libs/Http/IHttpClient.cs index 2b41ed34..a0389b29 100644 --- a/Packages/StreamVideo/Runtime/Libs/Http/IHttpClient.cs +++ b/Packages/StreamVideo/Runtime/Libs/Http/IHttpClient.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; namespace StreamVideo.Libs.Http @@ -13,18 +14,19 @@ public interface IHttpClient void AddDefaultCustomHeader(string key, string value); - Task GetAsync(Uri uri); + Task GetAsync(Uri uri, CancellationToken cancellationToken = default); - Task PostAsync(Uri uri, object content); + Task PostAsync(Uri uri, object content, CancellationToken cancellationToken = default); - Task PutAsync(Uri uri, object content); + Task PutAsync(Uri uri, object content, CancellationToken cancellationToken = default); - Task PatchAsync(Uri uri, object content); + Task PatchAsync(Uri uri, object content, CancellationToken cancellationToken = default); - Task DeleteAsync(Uri uri); + Task DeleteAsync(Uri uri, CancellationToken cancellationToken = default); - Task SendHttpRequestAsync(HttpMethodType methodType, Uri uri, object optionalRequestContent); + Task SendHttpRequestAsync(HttpMethodType methodType, Uri uri, object optionalRequestContent, CancellationToken cancellationToken = default); - Task HeadAsync(Uri uri, ICollection>> resultHeaders = null); + Task HeadAsync(Uri uri, + ICollection>> resultHeaders = null, CancellationToken cancellationToken = default); } } \ No newline at end of file diff --git a/Packages/StreamVideo/Runtime/Libs/Http/UnityWebRequestHttpClient.cs b/Packages/StreamVideo/Runtime/Libs/Http/UnityWebRequestHttpClient.cs index f4100ad1..93f750bd 100644 --- a/Packages/StreamVideo/Runtime/Libs/Http/UnityWebRequestHttpClient.cs +++ b/Packages/StreamVideo/Runtime/Libs/Http/UnityWebRequestHttpClient.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Text; +using System.Threading; using System.Threading.Tasks; using UnityEngine.Networking; @@ -15,28 +16,31 @@ public class UnityWebRequestHttpClient : IHttpClient public void AddDefaultCustomHeader(string key, string value) => _headers[key] = value; - public Task GetAsync(Uri uri) => SendWebRequest(uri, UnityWebRequest.kHttpVerbGET); + public Task GetAsync(Uri uri, CancellationToken cancellationToken = default) + => SendWebRequest(uri, UnityWebRequest.kHttpVerbGET, cancellationToken: cancellationToken); - public Task PostAsync(Uri uri, object content) - => SendWebRequest(uri, UnityWebRequest.kHttpVerbPOST, content); + public Task PostAsync(Uri uri, object content, CancellationToken cancellationToken = default) + => SendWebRequest(uri, UnityWebRequest.kHttpVerbPOST, content, cancellationToken: cancellationToken); - public Task PutAsync(Uri uri, object content) - => SendWebRequest(uri, UnityWebRequest.kHttpVerbPUT, content); + public Task PutAsync(Uri uri, object content, CancellationToken cancellationToken = default) + => SendWebRequest(uri, UnityWebRequest.kHttpVerbPUT, content, cancellationToken: cancellationToken); - public Task PatchAsync(Uri uri, object content) => SendWebRequest(uri, HttpPatchMethod, content); + public Task PatchAsync(Uri uri, object content, CancellationToken cancellationToken = default) + => SendWebRequest(uri, HttpPatchMethod, content, cancellationToken: cancellationToken); - public Task DeleteAsync(Uri uri) => SendWebRequest(uri, UnityWebRequest.kHttpVerbDELETE); + public Task DeleteAsync(Uri uri, CancellationToken cancellationToken = default) + => SendWebRequest(uri, UnityWebRequest.kHttpVerbDELETE, cancellationToken: cancellationToken); public Task SendHttpRequestAsync(HttpMethodType methodType, Uri uri, - object optionalRequestContent) + object optionalRequestContent, CancellationToken cancellationToken = default) { var httpMethodKey = GetHttpMethodKey(methodType); - return SendWebRequest(uri, httpMethodKey, optionalRequestContent); + return SendWebRequest(uri, httpMethodKey, optionalRequestContent, cancellationToken: cancellationToken); } public Task HeadAsync(Uri uri, - ICollection>> resultHeaders) - => SendWebRequest(uri, HttpHeadMethod, resultHeaders: resultHeaders); + ICollection>> resultHeaders, CancellationToken cancellationToken) + => SendWebRequest(uri, HttpHeadMethod, resultHeaders: resultHeaders, cancellationToken: cancellationToken); private const string HttpHeadMethod = "HEAD"; private const string HttpPatchMethod = "PATCH"; @@ -60,7 +64,8 @@ private static string GetHttpMethodKey(HttpMethodType methodType) //StreamTodo: add cancellationToken support that will trigger https://docs.unity3d.com/ScriptReference/Networking.UnityWebRequest.Abort.html //StreamTodo: refactor to remove duplication private async Task SendWebRequest(Uri uri, string httpMethod, - object optionalContent = null, ICollection>> resultHeaders = null) + object optionalContent = null, ICollection>> resultHeaders = null, + CancellationToken cancellationToken = default) { if (optionalContent is FileWrapper fileWrapper) { @@ -84,6 +89,7 @@ private async Task SendWebRequest(Uri uri, string httpMethod, while (!asyncOperation.isDone) { + cancellationToken.ThrowIfCancellationRequested(); await Task.Yield(); } @@ -131,6 +137,7 @@ private async Task SendWebRequest(Uri uri, string httpMethod, while (!asyncOperation.isDone) { + cancellationToken.ThrowIfCancellationRequested(); await Task.Yield(); } diff --git a/Packages/StreamVideo/Runtime/Libs/Websockets/IWebsocketClient.cs b/Packages/StreamVideo/Runtime/Libs/Websockets/IWebsocketClient.cs index 2a41f90a..8b9bd112 100644 --- a/Packages/StreamVideo/Runtime/Libs/Websockets/IWebsocketClient.cs +++ b/Packages/StreamVideo/Runtime/Libs/Websockets/IWebsocketClient.cs @@ -1,5 +1,6 @@ using System; using System.Net.WebSockets; +using System.Threading; using System.Threading.Tasks; namespace StreamVideo.Libs.Websockets @@ -18,7 +19,7 @@ public interface IWebsocketClient : IDisposable bool TryDequeueMessage(out byte[] message); - Task ConnectAsync(Uri serverUri); + Task ConnectAsync(Uri serverUri, CancellationToken cancellationToken); void Update(); diff --git a/Packages/StreamVideo/Runtime/Libs/Websockets/NativeWebSocketWrapper.cs b/Packages/StreamVideo/Runtime/Libs/Websockets/NativeWebSocketWrapper.cs index 23422f7d..7878c407 100644 --- a/Packages/StreamVideo/Runtime/Libs/Websockets/NativeWebSocketWrapper.cs +++ b/Packages/StreamVideo/Runtime/Libs/Websockets/NativeWebSocketWrapper.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Net.WebSockets; using System.Text; +using System.Threading; using System.Threading.Tasks; using NativeWebSocket; using StreamVideo.Libs.Utils; @@ -37,7 +38,7 @@ public bool TryDequeueMessage(out byte[] message) return message != null; } - public async Task ConnectAsync(Uri serverUri) + public async Task ConnectAsync(Uri serverUri, CancellationToken cancellationToken) { if (_webSocket != null) { diff --git a/Packages/StreamVideo/Runtime/Libs/Websockets/WebsocketClient.cs b/Packages/StreamVideo/Runtime/Libs/Websockets/WebsocketClient.cs index 28a1bb5d..caa962af 100644 --- a/Packages/StreamVideo/Runtime/Libs/Websockets/WebsocketClient.cs +++ b/Packages/StreamVideo/Runtime/Libs/Websockets/WebsocketClient.cs @@ -38,7 +38,7 @@ public WebsocketClient(ILogs logs, Encoding encoding = default, bool isDebugMode public bool TryDequeueMessage(out byte[] message) => _receiveQueue.TryDequeue(out message); - public async Task ConnectAsync(Uri serverUri) + public async Task ConnectAsync(Uri serverUri, CancellationToken cancellationToken) { if (IsConnected || IsConnecting) { @@ -52,7 +52,8 @@ public async Task ConnectAsync(Uri serverUri) { await TryDisposeResourcesAsync(WebSocketCloseStatus.NormalClosure, "Clean up resources before connecting"); - _connectionCts = new CancellationTokenSource(); + cancellationToken.ThrowIfCancellationRequested(); + _connectionCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); _internalClient = new ClientWebSocket(); _internalClient.Options.SetRequestHeader("User-Agent", "unity-video-sdk-ws-client"); @@ -78,7 +79,7 @@ await TryDisposeResourcesAsync(WebSocketCloseStatus.NormalClosure, } catch (Exception e) { - _logs.Exception(e); + LogExceptionIfDebugMode(e); OnConnectionFailed(); throw; }