Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,7 @@ Assets/LocalOnlyTools_53854/
Assets/LocalOnlyTools_53854.meta
last_build_version_4983432740324322.txt
last_build_version_4983432740324322.meta
replace_imported_sample_scene_credentials.bat
replace_imported_sample_scene_credentials.ps1
replace_imported_sample_scene_credentials.sh

141 changes: 113 additions & 28 deletions Packages/StreamVideo/Runtime/Core/LowLevelClient/RtcSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ internal sealed class RtcSession : IMediaInputProvider, IDisposable
public const bool UseNativeAudioBindings = false;
#endif

public const int MaxParticipantsForVideoAutoSubscription = 5;

public event Action<bool> PublisherAudioTrackIsEnabledChanged;
public event Action<bool> PublisherVideoTrackIsEnabledChanged;

Expand Down Expand Up @@ -214,7 +216,7 @@ public Camera VideoSceneInput
}

#endregion

public bool ShouldSfuAttemptToReconnect()
{
if (CallState != CallingState.Joined && CallState != CallingState.Joining)
Expand Down Expand Up @@ -344,6 +346,8 @@ private void ValidateCallCredentialsOrThrow(IStreamCall call)
}
}

public void SetCallingState(CallingState newState) => CallState = newState;

public async Task StartAsync(StreamCall call, CancellationToken cancellationToken = default)
{
if (ActiveCall != null)
Expand All @@ -360,7 +364,7 @@ public async Task StartAsync(StreamCall call, CancellationToken cancellationToke
try
{
_logs.Info($"Start joining a call: type={call.Type}, id={call.Id}");

//StreamTodo: perhaps not necessary here
ClearSession();

Expand Down Expand Up @@ -422,6 +426,12 @@ public async Task StartAsync(StreamCall call, CancellationToken cancellationToke
_sfuWebSocket.SetSessionData(SessionId, offer.sdp, sfuUrl, sfuToken);
await _sfuWebSocket.ConnectAsync(cancellationToken);

#if STREAM_TESTS_ENABLED && UNITY_EDITOR
// Simulate a bit of delay for tests so we can test killing the operation in progress
//StreamTOdo: we could add fake delays in multiple places and this way control exiting from every step in tests
await Task.Delay(100);
#endif

// Wait for call to be joined with timeout
const int joinTimeoutSeconds = 30;
var joinStartTime = _timeService.Time;
Expand Down Expand Up @@ -453,6 +463,11 @@ public async Task StartAsync(StreamCall call, CancellationToken cancellationToke
WebRTC.StartAudioPlayback(AudioOutputSampleRate, AudioOutputChannels);
#endif
}

foreach(var p in ActiveCall.Participants)
{
NotifyParticipantJoined(p.SessionId);
}

//StreamTodo: validate when this state should set
CallState = CallingState.Joined;
Expand All @@ -465,12 +480,12 @@ public async Task StartAsync(StreamCall call, CancellationToken cancellationToke
_videoAudioSyncBenchmark?.Init(call);
#endif
}
catch(OperationCanceledException)
catch (OperationCanceledException)
{
ClearSession();
throw;
}
catch(Exception e)
catch (Exception e)
{
_logs.Exception(e);
ClearSession();
Expand All @@ -494,7 +509,7 @@ public async Task StopAsync(string reason = "")
WebRTC.StopAudioPlayback();
#endif
}

if (CallState == CallingState.Leaving || CallState == CallingState.Offline)
{
//StreamTODO: should this return a task of the ongoing stop?
Expand Down Expand Up @@ -537,6 +552,14 @@ public async Task StopAsync(string reason = "")
}
}
}
catch (HttpRequestException httpEx)
{
_logs.Info($"Network unavailable during final stats send: {httpEx.Message}");
}
catch (OperationCanceledException)
{
_logs.Info("Final stats send timed out.");
}
catch (Exception e)
{
_logs.Warning($"Failed to send final stats on leave: {e.Message}");
Expand Down Expand Up @@ -581,6 +604,42 @@ public void UpdateRequestedVideoResolution(string participantSessionId, VideoRes
QueueTracksSubscriptionRequest();
}

public void UpdateIncomingVideoRequested(string participantSessionId, bool isRequested)
{
_incomingVideoRequestedByParticipantSessionId[participantSessionId] = isRequested;
QueueTracksSubscriptionRequest();
}

public void UpdateIncomingAudioRequested(string participantSessionId, bool isRequested)
{
_incomingAudioRequestedByParticipantSessionId[participantSessionId] = isRequested;
QueueTracksSubscriptionRequest();
}

// Let's request video for the first 10 participants that join
public void NotifyParticipantJoined(string participantSessionId)
{
if (ActiveCall == null)
{
return;
}

var participantCount = ActiveCall.Participants?.Count ?? 0;
var requestVideo = participantCount <= MaxParticipantsForVideoAutoSubscription;
var requestAudio = true; // No limit by default

_incomingVideoRequestedByParticipantSessionId.TryAdd(participantSessionId, requestVideo);
_incomingAudioRequestedByParticipantSessionId.TryAdd(participantSessionId, requestAudio);
}

public void NotifyParticipantLeft(string participantSessionId)
{
_videoResolutionByParticipantSessionId.Remove(participantSessionId);
_incomingVideoRequestedByParticipantSessionId.Remove(participantSessionId);
_incomingAudioRequestedByParticipantSessionId.Remove(participantSessionId);
QueueTracksSubscriptionRequest();
}

public void SetAudioRecordingDevice(MicrophoneDeviceInfo device)
{
_logs.WarningIfDebug("RtcSession.SetAudioRecordingDevice device: " + device);
Expand Down Expand Up @@ -659,6 +718,12 @@ public void ResumeAndroidAudioPlayback()
private readonly Dictionary<string, VideoResolution> _videoResolutionByParticipantSessionId
= new Dictionary<string, VideoResolution>();

private readonly Dictionary<string, bool> _incomingVideoRequestedByParticipantSessionId
= new Dictionary<string, bool>();

private readonly Dictionary<string, bool> _incomingAudioRequestedByParticipantSessionId
= new Dictionary<string, bool>();

private HttpClient _httpClient;
private CallingState _callState;

Expand Down Expand Up @@ -686,6 +751,8 @@ private void ClearSession()

_pendingIceTrickleRequests.Clear();
_videoResolutionByParticipantSessionId.Clear();
_incomingVideoRequestedByParticipantSessionId.Clear();
_incomingAudioRequestedByParticipantSessionId.Clear();
_tracerManager?.Clear();

Subscriber?.Dispose();
Expand Down Expand Up @@ -843,33 +910,47 @@ private IEnumerable<TrackSubscriptionDetails> GetDesiredTracksDetails()
continue;
}

var requestedVideoResolution = GetRequestedVideoResolution(participant);

foreach (var trackType in trackTypes)
var userId = GetUserId(participant);
if (string.IsNullOrEmpty(userId))
{
//StreamTODO: UserId is sometimes null here
//This was before changing the IUpdateableFrom<CallParticipantResponseInternalDTO, StreamVideoCallParticipant>.UpdateFromDto
//to extract UserId from User obj
_logs.Error(
$"Cannot subscribe to any tracks - participant UserId is null or empty. SessionID: {participant.SessionId}");
continue;
}

var userId = GetUserId(participant);
if (string.IsNullOrEmpty(userId))
var shouldConsumeAudio = ShouldSubscribeToAudioTrack(participant);
if (shouldConsumeAudio)
{
yield return new TrackSubscriptionDetails
{
_logs.Error(
$"Cannot subscribe to {trackType} - participant UserId is null or empty. SessionID: {participant.SessionId}");
continue;
}
UserId = userId,
SessionId = participant.SessionId,
TrackType = SfuTrackType.Audio,
};
}

var shouldConsumeVideo = ShouldSubscribeToVideoTrack(participant);
if (shouldConsumeVideo)
{
var requestedVideoResolution = GetRequestedVideoResolution(participant);

yield return new TrackSubscriptionDetails
{
UserId = userId,
SessionId = participant.SessionId,
TrackType = trackType,
TrackType = SfuTrackType.Video,
Dimension = requestedVideoResolution.ToVideoDimension()
};
}
}
}

private bool ShouldSubscribeToVideoTrack(IStreamVideoCallParticipant participant)
=> _incomingVideoRequestedByParticipantSessionId.GetValueOrDefault(participant.SessionId, false);

private bool ShouldSubscribeToAudioTrack(IStreamVideoCallParticipant participant)
=> _incomingAudioRequestedByParticipantSessionId.GetValueOrDefault(participant.SessionId, false);

//StreamTodo: remove this, this is a workaround to Null UserId error
private string GetUserId(IStreamVideoCallParticipant participant)
{
Expand Down Expand Up @@ -963,7 +1044,7 @@ private void UpdateAudioRecording()
private void OnSfuJoinResponse(JoinResponse joinResponse)
{
//StreamTODO: what if left the call and started a new one but the JoinResponse belongs to the previous session?

_sfuTracer?.Trace(PeerConnectionTraceKey.JoinRequest, joinResponse);
_logs.InfoIfDebug($"Handle Sfu {nameof(JoinResponse)}");
ActiveCall.UpdateFromSfu(joinResponse);
Expand Down Expand Up @@ -1029,7 +1110,7 @@ private async void OnSfuSubscriberOffer(SubscriberOffer subscriberOffer)
{
return;
}

//StreamTodo: handle subscriberOffer.iceRestart
var rtcSessionDescription = new RTCSessionDescription
{
Expand All @@ -1039,7 +1120,8 @@ private async void OnSfuSubscriberOffer(SubscriberOffer subscriberOffer)

try
{
await Subscriber.SetRemoteDescriptionAsync(rtcSessionDescription, GetCurrentCancellationTokenOrDefault());
await Subscriber.SetRemoteDescriptionAsync(rtcSessionDescription,
GetCurrentCancellationTokenOrDefault());
Subscriber.ThrowDisposedDuringOperationIfNull();
}
catch (Exception e)
Expand Down Expand Up @@ -1099,14 +1181,16 @@ private void OnSfuTrackUnpublished(TrackUnpublished trackUnpublished)
var sessionId = trackUnpublished.SessionId;
var type = trackUnpublished.Type.ToPublicEnum();
var cause = trackUnpublished.Cause;

// StreamTODO: test if this works well with other user muting this user
var updateLocalParticipantState = cause != TrackUnpublishReason.Unspecified && cause != TrackUnpublishReason.UserMuted;
var updateLocalParticipantState
= cause != TrackUnpublishReason.Unspecified && cause != TrackUnpublishReason.UserMuted;

// Optionally available. Read TrackUnpublished.participant comment in events.proto
var participantSfuDto = trackUnpublished.Participant;

UpdateParticipantTracksState(userId, sessionId, type, isEnabled: false, updateLocalParticipantState, out var participant);
UpdateParticipantTracksState(userId, sessionId, type, isEnabled: false, updateLocalParticipantState,
out var participant);

if (participantSfuDto != null && participant != null)
{
Expand All @@ -1127,7 +1211,8 @@ private void OnSfuTrackPublished(TrackPublished trackPublished)
// Optionally available. Read TrackUnpublished.participant comment in events.proto
var participantSfuDto = trackPublished.Participant;

UpdateParticipantTracksState(userId, sessionId, type, isEnabled: true, updateLocalParticipantState: true, out var participant);
UpdateParticipantTracksState(userId, sessionId, type, isEnabled: true, updateLocalParticipantState: true,
out var participant);

if (participantSfuDto != null && participant != null)
{
Expand Down Expand Up @@ -1498,7 +1583,7 @@ private async void OnPublisherNegotiationNeeded()
$"{nameof(Publisher.SignalingState)} state is not stable, current state: {Publisher.SignalingState}");
}

if(GetCurrentCancellationTokenOrDefault().IsCancellationRequested)
if (GetCurrentCancellationTokenOrDefault().IsCancellationRequested)
{
return;
}
Expand Down Expand Up @@ -1885,7 +1970,7 @@ private void OnPublisherVideoTrackChanged(VideoStreamTrack videoTrack)
{
PublisherVideoTrackChanged?.Invoke();
}

void PublisherOnDisconnected()
{
if (CallState == CallingState.Joined || CallState == CallingState.Joining)
Expand All @@ -1912,7 +1997,7 @@ private static bool AssertCallIdMatch(IStreamCall activeCall, string callId, ILo

return true;
}


private void SubscribeToSfuEvents()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,8 @@ public async Task<RTCSessionDescription> CreateAnswerAsync(CancellationToken can
public void Update()
{
//StreamTodo: investigate if this Blit is necessary
// One reason was to easy control target resolution -> we don't accept every target resolution because small res can crash Android video encoder
// We should check if WebCamTexture allows setting any resolution
if (_publisherVideoTrackTexture != null && _mediaInputProvider.VideoInput != null)
{
Graphics.Blit(_mediaInputProvider.VideoInput, _publisherVideoTrackTexture);
Expand Down Expand Up @@ -273,6 +275,11 @@ public void Dispose()

if (_publisherVideoTrackTexture != null)
{
// Unity gives warning when releasing an active texture
if (RenderTexture.active == _publisherVideoTrackTexture)
{
RenderTexture.active = null;
}
_publisherVideoTrackTexture.Release();
_publisherVideoTrackTexture = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,5 +85,17 @@ public interface IStreamVideoCallParticipant : IStreamStatefulModel, IHasCustomD
/// </summary>
/// <param name="videoResolution">Video resolution you wish to receive for this participant. You can use a predefined size or pick a predefined one from <see cref="VideoResolution"/></param>
void UpdateRequestedVideoResolution(VideoResolution videoResolution);

/// <summary>
/// Should video track of this participant be received.
/// </summary>
/// <param name="enabled">If enabled, the video stream will be requested from the server</param>
void SetIncomingVideoEnabled(bool enabled);

/// <summary>
/// Should audio track of this participant be received
/// </summary>
/// <param name="enabled">If enabled, the audio stream will be requested from the server</param>
void SetIncomingAudioEnabled(bool enabled);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,9 @@ internal void UpdateFromSfu(JoinResponse joinResponse)
internal void UpdateFromSfu(ParticipantJoined participantJoined, ICache cache)
{
var participant = Session.UpdateFromSfu(participantJoined, cache);
LowLevelClient.RtcSession.NotifyParticipantJoined(participantJoined.Participant.SessionId);
UpdateSortedParticipants();

ParticipantJoined?.Invoke(participant);
}

Expand All @@ -678,6 +680,8 @@ internal void UpdateFromSfu(ParticipantLeft participantLeft, ICache cache)
Logs.Warning("Error when generating debug log: " + e.Message);
}

LowLevelClient.RtcSession.NotifyParticipantLeft(participantLeft.Participant.SessionId);

var participant = Session.UpdateFromSfu(participantLeft, cache);

_localPinsSessionIds.RemoveAll(participant.sessionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ public IEnumerable<IStreamTrack> GetTracks()
yield return _screenShareTrack;
}
}

public void SetIncomingVideoEnabled(bool enabled)
=> LowLevelClient.RtcSession.UpdateIncomingVideoRequested(SessionId, enabled);

public void SetIncomingAudioEnabled(bool enabled)
=> LowLevelClient.RtcSession.UpdateIncomingAudioRequested(SessionId, enabled);

public void UpdateRequestedVideoResolution(VideoResolution videoResolution)
=> LowLevelClient.RtcSession.UpdateRequestedVideoResolution(SessionId, videoResolution);
Expand Down
Loading
Loading