Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions kotlin/src/main/kotlin/io/smallrye/mutiny/coroutines/Multi.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import io.smallrye.mutiny.subscription.MultiSubscriber
import java.util.concurrent.Flow.Subscription
import java.util.concurrent.atomic.AtomicReference
import kotlin.coroutines.cancellation.CancellationException
import kotlin.coroutines.coroutineContext
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.channels.trySendBlocking
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.callbackFlow
Expand All @@ -32,7 +32,7 @@ import kotlinx.coroutines.launch
fun <T> Multi<T>.asFlow(
bufferCapacity: Int = Channel.UNLIMITED,
bufferOverflowStrategy: BufferOverflow = BufferOverflow.SUSPEND
): Flow<T> = callbackFlow<T> {
): Flow<T> = callbackFlow {
val parentCtx = coroutineContext

val subscriber = object : MultiSubscriber<T> {
Expand Down Expand Up @@ -77,7 +77,7 @@ fun <T> Multi<T>.asFlow(
* without respecting the requested amount of the subscriber.
*/
suspend fun <T> Flow<T>.asMulti(): Multi<T> {
val parentCtx = coroutineContext
val parentCtx = currentCoroutineContext()
return Multi.createFrom().emitter { em: MultiEmitter<in T> ->
val job = CoroutineScope(parentCtx).launch {
try {
Expand All @@ -101,4 +101,4 @@ suspend fun <T> Flow<T>.asMulti(): Multi<T> {
}
}

private class NonPropagatingCancellationException : kotlin.coroutines.cancellation.CancellationException()
private class NonPropagatingCancellationException : CancellationException()
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ import kotlinx.coroutines.CancellationException
internal inline fun suppressCancellationException(block: () -> Unit) =
try {
block()
} catch (e: CancellationException) {
} catch (_: CancellationException) {
// CancellationExceptions are likely to happen if an emitter processes during cancellation/unsubscription.
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class UniReplaceWithUnitTest {
@Test
fun `test an Uni that's already Void`() {
// Given
val uni = Uni.createFrom().voidItem();
val uni = Uni.createFrom().voidItem()

// When
val subscriber = UniAssertSubscriber.create<Any>()
Expand Down
Loading