Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions common/src/main/java/com/pedro/common/ConnectChecker.kt
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,20 @@ interface ConnectChecker: BitrateChecker {
fun onDisconnect()
fun onAuthError()
fun onAuthSuccess()

/**
* Typed transport event from the sender layer.
*
* Provides machine-readable signal types so callers can differentiate between
* queue-pressure events ([TransportEvent.QueueOverflow]) and network errors
* ([TransportEvent.NetworkSendError]).
*
* Default implementation is a no-op so existing [ConnectChecker] implementors
* are not required to override this method. A [TransportEvent.NetworkSendError] is
* always accompanied by a corresponding [onConnectionFailed] call for backward
* compatibility.
*
* @param event Typed transport event; use exhaustive `when` for dispatch.
*/
fun onTransportEvent(event: TransportEvent) {}
}
52 changes: 52 additions & 0 deletions common/src/main/java/com/pedro/common/FrameLifecycleListener.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright (C) 2024 pedroSG94.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.pedro.common

import com.pedro.common.frame.MediaFrame

/**
* Lifecycle callback for frames sent through a [com.pedro.common.base.BaseSender].
*
* The callback is invoked from the **sender's dispatch thread** (IO coroutine) after the
* sender has fully consumed a [MediaFrame] — i.e., after the frame data has been encoded
* into protocol packets and written to the network socket. At that point the [MediaFrame.data]
* buffer is no longer referenced by the sender and may be safely returned to a buffer pool.
*
* ## Thread guarantee
* [onFrameConsumed] is called on the sender's internal coroutine dispatcher (typically
* [kotlinx.coroutines.Dispatchers.IO]). Implementations must not block this thread.
*
* ## Usage with FramePayloadPool (RTMP pooled-copy)
* ```kotlin
* rtmpSender.frameLifecycleListener = FrameLifecycleListener { frame ->
* (frame.data as? PooledBuffer)?.release()
* }
* ```
*/
fun interface FrameLifecycleListener {

/**
* Called after [frame] has been fully consumed by the sender.
*
* The sender will not access [frame] or [MediaFrame.data] after this call.
* The implementation may safely release, recycle, or pool the underlying buffer.
*
* @param frame The frame that was consumed. Same instance that was passed to
* [com.pedro.common.base.BaseSender.sendMediaFrame].
*/
fun onFrameConsumed(frame: MediaFrame)
}
42 changes: 42 additions & 0 deletions common/src/main/java/com/pedro/common/QueueSnapshot.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (C) 2024 pedroSG94.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.pedro.common

/**
* Point-in-time snapshot of a sender's internal frame queue.
*
* Returned by [com.pedro.common.base.BaseSender.getQueueSnapshot].
* Safe to pass across threads; all fields are immutable.
*
* @param capacity Maximum number of frames the queue can hold.
* @param items Number of frames currently queued.
* @param softThresholdPercent Usage % at which congestion enters soft state (default 70).
* @param hardThresholdPercent Usage % at which congestion enters hard state (default 85).
*/
data class QueueSnapshot(
val capacity: Int,
val items: Int,
val softThresholdPercent: Float = 70f,
val hardThresholdPercent: Float = 85f,
) {
/** Usage in [0.0, 1.0]. 0.0 when capacity is zero. */
val usageRatio: Double
get() = if (capacity > 0) items.toDouble() / capacity else 0.0

/** Human-readable summary for log lines. */
fun summary(): String = "$items/$capacity (${"%.0f".format(usageRatio * 100)}%)"
}
62 changes: 62 additions & 0 deletions common/src/main/java/com/pedro/common/TransportEvent.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (C) 2024 pedroSG94.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.pedro.common

/**
* Typed transport events emitted by [com.pedro.common.base.BaseSender].
*
* These supplement [ConnectChecker.onConnectionFailed] with machine-readable signal types,
* allowing callers to differentiate between queue-pressure events (which do not require a
* network reconnect) and actual network send failures (which do).
*
* Designed as a sealed class so all variants are exhaustive-checked by callers.
*/
sealed class TransportEvent {

/**
* The sender's internal frame queue has reached capacity and rejected a frame.
*
* This does not indicate a network failure. The connection is healthy; the sender
* cannot consume frames fast enough. Callers should reduce frame rate or bitrate,
* request a keyframe, or clear the queue — but NOT reconnect.
*
* @param droppedVideo Total dropped video frames of this type since sender start.
* @param droppedAudio Total dropped audio frames of this type since sender start.
* @param queueCapacity Maximum number of frames the queue can hold.
* @param queueSize Number of frames currently in the queue.
*/
data class QueueOverflow(
val droppedVideo: Long,
val droppedAudio: Long,
val queueCapacity: Int,
val queueSize: Int,
) : TransportEvent()

/**
* A network-level send error occurred inside the sender dispatch loop.
*
* This is equivalent to the existing [ConnectChecker.onConnectionFailed] call and
* is emitted in addition to (not instead of) that callback for typed handling.
*
* @param message Human-readable error description.
* @param cause Original exception, if available.
*/
data class NetworkSendError(
val message: String,
val cause: Throwable? = null,
) : TransportEvent()
}
56 changes: 53 additions & 3 deletions common/src/main/java/com/pedro/common/base/BaseSender.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ package com.pedro.common.base
import android.util.Log
import com.pedro.common.BitrateManager
import com.pedro.common.ConnectChecker
import com.pedro.common.FrameLifecycleListener
import com.pedro.common.QueueSnapshot
import com.pedro.common.StreamBlockingQueue
import com.pedro.common.TransportEvent
import com.pedro.common.frame.MediaFrame
import com.pedro.common.onMainThread
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
Expand All @@ -22,6 +26,8 @@ abstract class BaseSender(

@Volatile
protected var running = false
// Tracks actual queue capacity; updated when resizeCache() replaces the queue.
// getCacheSize() reads this field — it must stay in sync with the live queue object.
private var cacheSize = 400
@Volatile
protected var queue = StreamBlockingQueue(cacheSize)
Expand All @@ -41,6 +47,15 @@ abstract class BaseSender(
@Volatile
protected var bytesSendPerSecond = 0L

/**
* Lifecycle callback invoked after the sender's dispatch thread has fully consumed a
* [MediaFrame]. Register a [FrameLifecycleListener] to receive notifications when the
* frame's [MediaFrame.data] buffer may safely be returned to a pool.
*
* Called from the sender's IO coroutine; implementations must not block.
*/
var frameLifecycleListener: FrameLifecycleListener? = null

abstract fun setVideoInfo(sps: ByteBuffer, pps: ByteBuffer?, vps: ByteBuffer?)
abstract fun setAudioInfo(sampleRate: Int, isStereo: Boolean)
protected abstract suspend fun onRun()
Expand All @@ -50,17 +65,26 @@ abstract class BaseSender(
if (running && !queue.trySend(mediaFrame)) {
when (mediaFrame.type) {
MediaFrame.Type.VIDEO -> {
Log.i(TAG, "Video frame discarded")
Log.i(TAG, "Video frame discarded (queue full)")
droppedVideoFrames++
}
MediaFrame.Type.AUDIO -> {
Log.i(TAG, "Audio frame discarded")
Log.i(TAG, "Audio frame discarded (queue full)")
droppedAudioFrames++
}
}
}
}

/**
* Called by subclass [onRun] implementations after a [MediaFrame] has been fully
* dispatched to the network socket. Notifies [frameLifecycleListener] so callers can
* recycle the frame's buffer.
*/
protected fun notifyFrameConsumed(frame: MediaFrame) {
frameLifecycleListener?.onFrameConsumed(frame)
}

fun start() {
bitrateManager.reset()
queue.clear()
Expand All @@ -74,6 +98,18 @@ abstract class BaseSender(
delay(timeMillis = 1000)
}
}
val frameEventTask = async {
while (scope.isActive && running) {
val event = TransportEvent.QueueOverflow(
droppedVideo = droppedVideoFrames,
droppedAudio = droppedAudioFrames,
queueCapacity = cacheSize,
queueSize = queue.getSize(),
)
onMainThread { connectChecker.onTransportEvent(event) }
delay(timeMillis = 1500)
}
}
onRun()
}
}
Expand All @@ -93,7 +129,7 @@ abstract class BaseSender(

@Throws(IllegalArgumentException::class)
fun hasCongestion(percentUsed: Float = 20f): Boolean {
if (percentUsed < 0 || percentUsed > 100) throw IllegalArgumentException("the value must be in range 0 to 100")
if (percentUsed !in 0.0..100.0) throw IllegalArgumentException("the value must be in range 0 to 100")
val size = queue.getSize().toFloat()
val remaining = queue.remainingCapacity().toFloat()
val capacity = size + remaining
Expand All @@ -107,12 +143,26 @@ abstract class BaseSender(
val tempQueue = StreamBlockingQueue(newSize)
queue.drainTo(tempQueue)
queue = tempQueue
cacheSize = newSize // keep getCacheSize() in sync with the new queue capacity
}

/**
* Returns the current maximum capacity of the sender frame queue.
* Reflects the value passed to the most recent [resizeCache] call.
*/
fun getCacheSize(): Int = cacheSize

fun getItemsInCache(): Int = queue.getSize()

/**
* Returns a point-in-time snapshot of the sender queue state.
* Thread-safe; values are read atomically from the backing [StreamBlockingQueue].
*/
fun getQueueSnapshot(): QueueSnapshot = QueueSnapshot(
capacity = cacheSize,
items = queue.getSize(),
)

fun clearCache() {
queue.clear()
}
Expand Down
8 changes: 8 additions & 0 deletions rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpSender.kt
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,18 @@ class RtmpSender(
bytesSend += size
bytesSendPerSecond += size
}
// Notify lifecycle listener: frame fully consumed, buffer may be recycled.
notifyFrameConsumed(mediaFrame)
}.exceptionOrNull()
if (error != null) {
onMainThread {
connectChecker.onConnectionFailed("Error send packet, ${error.validMessage()}")
connectChecker.onTransportEvent(
com.pedro.common.TransportEvent.NetworkSendError(
message = "Error send packet, ${error.validMessage()}",
cause = error,
)
)
}
Log.e(TAG, "send error: ", error)
running = false
Expand Down
8 changes: 8 additions & 0 deletions rtsp/src/main/java/com/pedro/rtsp/rtsp/RtspSender.kt
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,18 @@ class RtspSender(
Log.i(TAG, "wrote $type packet, size $size")
}
}
// Notify lifecycle listener: frame fully consumed, buffer may be recycled.
notifyFrameConsumed(mediaFrame)
}.exceptionOrNull()
if (error != null) {
onMainThread {
connectChecker.onConnectionFailed("Error send packet, ${error.validMessage()}")
connectChecker.onTransportEvent(
com.pedro.common.TransportEvent.NetworkSendError(
message = "Error send packet, ${error.validMessage()}",
cause = error,
)
)
}
Log.e(TAG, "send error: ", error)
running = false
Expand Down
8 changes: 8 additions & 0 deletions srt/src/main/java/com/pedro/srt/srt/SrtSender.kt
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,18 @@ class SrtSender(
bytesSend += bytesPsi + bytes
bytesSendPerSecond += bytesPsi + bytes
}
// Notify lifecycle listener: frame fully consumed, buffer may be recycled.
notifyFrameConsumed(mediaFrame)
}.exceptionOrNull()
if (error != null) {
onMainThread {
connectChecker.onConnectionFailed("Error send packet, ${error.validMessage()}")
connectChecker.onTransportEvent(
com.pedro.common.TransportEvent.NetworkSendError(
message = "Error send packet, ${error.validMessage()}",
cause = error,
)
)
}
Log.e(TAG, "send error: ", error)
running = false
Expand Down
8 changes: 8 additions & 0 deletions udp/src/main/java/com/pedro/udp/UdpSender.kt
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,18 @@ class UdpSender(
bytesSend += bytesPsi + bytes
bytesSendPerSecond += bytesPsi + bytes
}
// Notify lifecycle listener: frame fully consumed, buffer may be recycled.
notifyFrameConsumed(mediaFrame)
}.exceptionOrNull()
if (error != null) {
onMainThread {
connectChecker.onConnectionFailed("Error send packet, ${error.validMessage()}")
connectChecker.onTransportEvent(
com.pedro.common.TransportEvent.NetworkSendError(
message = "Error send packet, ${error.validMessage()}",
cause = error,
)
)
}
Log.e(TAG, "send error: ", error)
running = false
Expand Down