Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using System;
using StreamVideo.Core.StatefulModels;

namespace StreamVideo.Core.Exceptions
{
/// <summary>
/// Exception thrown when trying to join a <see cref="IStreamCall"/> but another <see cref="IStreamCall"/> is already joined or currently joining
/// </summary>
public class StreamCallInProgressException : Exception
{
public StreamCallInProgressException(string message) : base(message)
{
}
}
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using System;
using StreamVideo.Core.StatefulModels;

namespace StreamVideo.Core.Exceptions
{
/// <summary>
/// Exception thrown when a <see cref="IStreamCall"/> with provided ID is not found
/// </summary>
public class StreamCallNotFoundException : Exception
{
public StreamCallNotFoundException(string message) : base(message)
{

}
}
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 21 additions & 1 deletion Packages/StreamVideo/Runtime/Core/IStreamVideoClient.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using StreamVideo.Core.QueryBuilders.Sort.Calls;
using StreamVideo.Core.DeviceManagers;
Expand Down Expand Up @@ -67,6 +68,8 @@ public interface IStreamVideoClient : IStreamVideoClientEventsListener, IDisposa
/// </summary>
/// <param name="credentials">Credentials required to connect user: api_key, user_id, and user_token</param>
Task<IStreamVideoUser> ConnectUserAsync(AuthCredentials credentials);

Task<IStreamVideoUser> ConnectUserAsync(AuthCredentials credentials, CancellationToken cancellationToken);

/// <summary>
/// Disconnect user from Stream server.
Expand All @@ -76,10 +79,19 @@ public interface IStreamVideoClient : IStreamVideoClientEventsListener, IDisposa
Task<IStreamCall> JoinCallAsync(StreamCallType callType, string callId, bool create, bool ring,
bool notify);

Task<IStreamCall> JoinCallAsync(StreamCallType callType, string callId, bool create, bool ring,
bool notify, CancellationToken cancellationToken);

/// <summary>
/// Gets call information without joining it. Will return null if the call doesn't exist
/// Gets <see cref="IStreamCall"/> information without joining it. Will return null if the call doesn't exist
/// </summary>
Task<IStreamCall> GetCallAsync(StreamCallType callType, string callId);

/// <summary>
/// Gets <see cref="IStreamCall"/> information without joining it. Will return null if the call doesn't exist
/// </summary>
Task<IStreamCall> GetCallAsync(StreamCallType callType, string callId,
CancellationToken cancellationToken);

/// <summary>
/// Get a call with a specified Type and ID. If such a call doesn't exist, it will be created.
Expand All @@ -88,6 +100,14 @@ Task<IStreamCall> JoinCallAsync(StreamCallType callType, string callId, bool cre
/// <param name="callId">Call ID</param>
/// <returns>Call object of type: <see cref="IStreamCall"/></returns>
Task<IStreamCall> GetOrCreateCallAsync(StreamCallType callType, string callId);

/// <summary>
/// Get a call with a specified Type and ID. If such a call doesn't exist, it will be created.
/// </summary>
/// <param name="callType">Call type - this defines the permissions and other settings for the call. Read more in the <a href="https://getstream.io/video/docs/unity/guides/call-types/">Call Types Docs</a></param>
/// <param name="callId">Call ID</param>
/// <returns>Call object of type: <see cref="IStreamCall"/></returns>
Task<IStreamCall> GetOrCreateCallAsync(StreamCallType callType, string callId, CancellationToken cancellationToken);

/// <summary>
/// Query calls
Expand Down
43 changes: 23 additions & 20 deletions Packages/StreamVideo/Runtime/Core/InternalDTO/Sfu/GeneratedAPI.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using System.Threading;

namespace StreamVideo.Core.Sfu {
using Signal = StreamVideo.v1.Sfu.Signal;
// Generated by protoc-gen-twirpcs. DO NOT EDIT!
Expand Down Expand Up @@ -46,11 +48,12 @@ private static string parseJSONString(string jsonData, string key) {
}

private delegate Resp doParsing<Resp>(byte[] data) where Resp : IMessage;
private static async Task<Resp> DoRequest<Req, Resp>(HttpClient client, string address, Req req, doParsing<Resp> parserFunc) where Req : IMessage where Resp : IMessage {
private static async Task<Resp> DoRequest<Req, Resp>(HttpClient client, string address, Req req, doParsing<Resp> parserFunc, CancellationToken cancellationToken = default) where Req : IMessage where Resp : IMessage {
using (var content = new ByteArrayContent(req.ToByteArray())) {
content.Headers.ContentType = CONTENT_TYPE_PROTOBUF;
using (HttpResponseMessage response = await client.PostAsync(address, content)) {
using (HttpResponseMessage response = await client.PostAsync(address, content, cancellationToken)) {
var byteArr = await response.Content.ReadAsByteArrayAsync();
cancellationToken.ThrowIfCancellationRequested();
if (!response.IsSuccessStatusCode) {
string errorJSON = System.Text.Encoding.UTF8.GetString(byteArr, 0, byteArr.Length);
throw createException(errorJSON);
Expand All @@ -61,44 +64,44 @@ private static async Task<Resp> DoRequest<Req, Resp>(HttpClient client, string a
}

// SetPublisher sends the WebRTC offer for the peer connection used to publish A/V
public static async Task<Signal.SetPublisherResponse> SetPublisher(HttpClient client, Signal.SetPublisherRequest req) {
return await DoRequest<Signal.SetPublisherRequest, Signal.SetPublisherResponse>(client, "/twirp/stream.video.sfu.signal.SignalServer/SetPublisher", req, Signal.SetPublisherResponse.Parser.ParseFrom);
public static async Task<Signal.SetPublisherResponse> SetPublisher(HttpClient client, Signal.SetPublisherRequest req, CancellationToken cancellationToken = default) {
return await DoRequest<Signal.SetPublisherRequest, Signal.SetPublisherResponse>(client, "/twirp/stream.video.sfu.signal.SignalServer/SetPublisher", req, Signal.SetPublisherResponse.Parser.ParseFrom, cancellationToken);
}

// answer is sent by the client to the SFU after receiving a subscriber_offer.
public static async Task<Signal.SendAnswerResponse> SendAnswer(HttpClient client, Signal.SendAnswerRequest req) {
return await DoRequest<Signal.SendAnswerRequest, Signal.SendAnswerResponse>(client, "/twirp/stream.video.sfu.signal.SignalServer/SendAnswer", req, Signal.SendAnswerResponse.Parser.ParseFrom);
public static async Task<Signal.SendAnswerResponse> SendAnswer(HttpClient client, Signal.SendAnswerRequest req, CancellationToken cancellationToken = default) {
return await DoRequest<Signal.SendAnswerRequest, Signal.SendAnswerResponse>(client, "/twirp/stream.video.sfu.signal.SignalServer/SendAnswer", req, Signal.SendAnswerResponse.Parser.ParseFrom, cancellationToken);
}

// SendICECandidate sends an ICE candidate to the client
public static async Task<StreamVideo.v1.Sfu.Signal.ICETrickleResponse> IceTrickle(HttpClient client, StreamVideo.v1.Sfu.Models.ICETrickle req) {
return await DoRequest<StreamVideo.v1.Sfu.Models.ICETrickle, StreamVideo.v1.Sfu.Signal.ICETrickleResponse>(client, "/twirp/stream.video.sfu.signal.SignalServer/IceTrickle", req, StreamVideo.v1.Sfu.Signal.ICETrickleResponse.Parser.ParseFrom);
public static async Task<StreamVideo.v1.Sfu.Signal.ICETrickleResponse> IceTrickle(HttpClient client, StreamVideo.v1.Sfu.Models.ICETrickle req, CancellationToken cancellationToken = default) {
return await DoRequest<StreamVideo.v1.Sfu.Models.ICETrickle, StreamVideo.v1.Sfu.Signal.ICETrickleResponse>(client, "/twirp/stream.video.sfu.signal.SignalServer/IceTrickle", req, StreamVideo.v1.Sfu.Signal.ICETrickleResponse.Parser.ParseFrom, cancellationToken);
}

// UpdateSubscribers is used to notify the SFU about the list of video subscriptions
// TODO: sync subscriptions based on this + update tracks using the dimension info sent by the user
public static async Task<Signal.UpdateSubscriptionsResponse> UpdateSubscriptions(HttpClient client, Signal.UpdateSubscriptionsRequest req) {
return await DoRequest<Signal.UpdateSubscriptionsRequest, Signal.UpdateSubscriptionsResponse>(client, "/twirp/stream.video.sfu.signal.SignalServer/UpdateSubscriptions", req, Signal.UpdateSubscriptionsResponse.Parser.ParseFrom);
public static async Task<Signal.UpdateSubscriptionsResponse> UpdateSubscriptions(HttpClient client, Signal.UpdateSubscriptionsRequest req, CancellationToken cancellationToken = default) {
return await DoRequest<Signal.UpdateSubscriptionsRequest, Signal.UpdateSubscriptionsResponse>(client, "/twirp/stream.video.sfu.signal.SignalServer/UpdateSubscriptions", req, Signal.UpdateSubscriptionsResponse.Parser.ParseFrom, cancellationToken);
}

public static async Task<Signal.UpdateMuteStatesResponse> UpdateMuteStates(HttpClient client, Signal.UpdateMuteStatesRequest req) {
return await DoRequest<Signal.UpdateMuteStatesRequest, Signal.UpdateMuteStatesResponse>(client, "/twirp/stream.video.sfu.signal.SignalServer/UpdateMuteStates", req, Signal.UpdateMuteStatesResponse.Parser.ParseFrom);
public static async Task<Signal.UpdateMuteStatesResponse> UpdateMuteStates(HttpClient client, Signal.UpdateMuteStatesRequest req, CancellationToken cancellationToken = default) {
return await DoRequest<Signal.UpdateMuteStatesRequest, Signal.UpdateMuteStatesResponse>(client, "/twirp/stream.video.sfu.signal.SignalServer/UpdateMuteStates", req, Signal.UpdateMuteStatesResponse.Parser.ParseFrom, cancellationToken);
}

public static async Task<Signal.ICERestartResponse> IceRestart(HttpClient client, Signal.ICERestartRequest req) {
return await DoRequest<Signal.ICERestartRequest, Signal.ICERestartResponse>(client, "/twirp/stream.video.sfu.signal.SignalServer/IceRestart", req, Signal.ICERestartResponse.Parser.ParseFrom);
public static async Task<Signal.ICERestartResponse> IceRestart(HttpClient client, Signal.ICERestartRequest req, CancellationToken cancellationToken = default) {
return await DoRequest<Signal.ICERestartRequest, Signal.ICERestartResponse>(client, "/twirp/stream.video.sfu.signal.SignalServer/IceRestart", req, Signal.ICERestartResponse.Parser.ParseFrom, cancellationToken);
}

public static async Task<Signal.SendStatsResponse> SendStats(HttpClient client, Signal.SendStatsRequest req) {
return await DoRequest<Signal.SendStatsRequest, Signal.SendStatsResponse>(client, "/twirp/stream.video.sfu.signal.SignalServer/SendStats", req, Signal.SendStatsResponse.Parser.ParseFrom);
public static async Task<Signal.SendStatsResponse> SendStats(HttpClient client, Signal.SendStatsRequest req, CancellationToken cancellationToken = default) {
return await DoRequest<Signal.SendStatsRequest, Signal.SendStatsResponse>(client, "/twirp/stream.video.sfu.signal.SignalServer/SendStats", req, Signal.SendStatsResponse.Parser.ParseFrom, cancellationToken);
}

public static async Task<Signal.StartNoiseCancellationResponse> StartNoiseCancellation(HttpClient client, Signal.StartNoiseCancellationRequest req) {
return await DoRequest<Signal.StartNoiseCancellationRequest, Signal.StartNoiseCancellationResponse>(client, "/twirp/stream.video.sfu.signal.SignalServer/StartNoiseCancellation", req, Signal.StartNoiseCancellationResponse.Parser.ParseFrom);
public static async Task<Signal.StartNoiseCancellationResponse> StartNoiseCancellation(HttpClient client, Signal.StartNoiseCancellationRequest req, CancellationToken cancellationToken = default) {
return await DoRequest<Signal.StartNoiseCancellationRequest, Signal.StartNoiseCancellationResponse>(client, "/twirp/stream.video.sfu.signal.SignalServer/StartNoiseCancellation", req, Signal.StartNoiseCancellationResponse.Parser.ParseFrom, cancellationToken);
}

public static async Task<Signal.StopNoiseCancellationResponse> StopNoiseCancellation(HttpClient client, Signal.StopNoiseCancellationRequest req) {
return await DoRequest<Signal.StopNoiseCancellationRequest, Signal.StopNoiseCancellationResponse>(client, "/twirp/stream.video.sfu.signal.SignalServer/StopNoiseCancellation", req, Signal.StopNoiseCancellationResponse.Parser.ParseFrom);
public static async Task<Signal.StopNoiseCancellationResponse> StopNoiseCancellation(HttpClient client, Signal.StopNoiseCancellationRequest req, CancellationToken cancellationToken = default) {
return await DoRequest<Signal.StopNoiseCancellationRequest, Signal.StopNoiseCancellationResponse>(client, "/twirp/stream.video.sfu.signal.SignalServer/StopNoiseCancellation", req, Signal.StopNoiseCancellationResponse.Parser.ParseFrom, cancellationToken);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Threading;
using System.Threading.Tasks;
using StreamVideo.Core.InternalDTO.Models;
using StreamVideo.Core.InternalDTO.Requests;
Expand All @@ -8,13 +9,13 @@ namespace StreamVideo.Core.LowLevelClient.API.Internal
internal interface IInternalVideoClientApi
{
Task<GetCallResponseInternalDTO> GetCallAsync(StreamCallType callType, string callId,
GetOrCreateCallRequestInternalDTO getCallRequest);
GetOrCreateCallRequestInternalDTO getCallRequest, CancellationToken cancellationToken);

Task<UpdateCallResponseInternalDTO> UpdateCallAsync(StreamCallType callType, string callId,
UpdateCallRequestInternalDTO updateCallRequest);

Task<GetOrCreateCallResponseInternalDTO> GetOrCreateCallAsync(StreamCallType callType, string callId,
GetOrCreateCallRequestInternalDTO getOrCreateCallRequest);
GetOrCreateCallRequestInternalDTO getOrCreateCallRequest, CancellationToken cancellationToken);

Task<AcceptCallResponseInternalDTO> AcceptCallAsync(StreamCallType callType, string callId);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using StreamVideo.Core.Exceptions;
using StreamVideo.Core.InternalDTO.Models;
Expand All @@ -25,14 +26,18 @@ protected InternalApiClientBase(IHttpClient httpClient, ISerializer serializer,
_lowLevelClient = lowLevelClient ?? throw new ArgumentNullException(nameof(lowLevelClient));
}

protected Task<TResponse> Get<TPayload, TResponse>(string endpoint, TPayload payload)
=> HttpRequest<TResponse>(HttpMethodType.Get, endpoint, payload);
//StreamTODO: add cancellation token support to all

protected Task<TResponse> Get<TPayload, TResponse>(string endpoint, TPayload payload,
CancellationToken cancellationToken)
=> HttpRequest<TResponse>(HttpMethodType.Get, endpoint, payload, cancellationToken: cancellationToken);

protected Task<TResponse> Get<TResponse>(string endpoint, QueryParameters parameters = null)
=> HttpRequest<TResponse>(HttpMethodType.Get, endpoint, queryParameters: parameters);

protected Task<TResponse> Post<TRequest, TResponse>(string endpoint, TRequest request = default)
=> HttpRequest<TResponse>(HttpMethodType.Post, endpoint, request);
protected Task<TResponse> Post<TRequest, TResponse>(string endpoint, TRequest request = default,
CancellationToken cancellationToken = default)
=> HttpRequest<TResponse>(HttpMethodType.Post, endpoint, request, cancellationToken: cancellationToken);

protected Task<TResponse> Post<TResponse>(string endpoint, object request = null)
=> HttpRequest<TResponse>(HttpMethodType.Post, endpoint, request);
Expand Down Expand Up @@ -74,7 +79,7 @@ private object TrySerializeRequestBodyContent(object content, out string seriali
}

private async Task<TResponse> HttpRequest<TResponse>(HttpMethodType httpMethod, string endpoint,
object requestBody = default, QueryParameters queryParameters = null)
object requestBody = default, QueryParameters queryParameters = null, CancellationToken cancellationToken = default)
{
// //StreamTodo: perhaps remove this requirement, sometimes we send empty body without any properties
// if (requestBody == null && IsRequestBodyRequiredByHttpMethod(httpMethod))
Expand All @@ -95,7 +100,7 @@ private async Task<TResponse> HttpRequest<TResponse>(HttpMethodType httpMethod,

LogFutureRequestIfDebug(uri, endpoint, httpMethod, logContent);

var httpResponse = await _httpClient.SendHttpRequestAsync(httpMethod, uri, httpContent);
var httpResponse = await _httpClient.SendHttpRequestAsync(httpMethod, uri, httpContent, cancellationToken);
var responseContent = httpResponse.Result;

if (!httpResponse.IsSuccessStatusCode)
Expand All @@ -121,6 +126,7 @@ private async Task<TResponse> HttpRequest<TResponse>(HttpMethodType httpMethod,
{
_logs.Info($"Http request failed due to expired token, connection id: {_lowLevelClient.ConnectionId}");
await _lowLevelClient.DisconnectAsync();
cancellationToken.ThrowIfCancellationRequested();
}

//StreamTodo: Refactor Token refresh logic. This relies on the fact that connecting fetches fresh token. But we can probably replace the token without breaking the connection
Expand All @@ -135,7 +141,7 @@ private async Task<TResponse> HttpRequest<TResponse>(HttpMethodType httpMethod,
while (_lowLevelClient.ConnectionState != ConnectionState.Connected)
{
i++;
await Task.Delay(1);
await Task.Delay(1, cancellationToken);

if (i > maxMsToWait)
{
Expand All @@ -152,7 +158,7 @@ private async Task<TResponse> HttpRequest<TResponse>(HttpMethodType httpMethod,
// Recreate the uri to include new connection id
uri = _requestUriFactory.CreateEndpointUri(endpoint, queryParameters);

httpResponse = await _httpClient.SendHttpRequestAsync(httpMethod, uri, httpContent);
httpResponse = await _httpClient.SendHttpRequestAsync(httpMethod, uri, httpContent, cancellationToken);
responseContent = httpResponse.Result;
}

Expand Down
Loading
Loading